Flink cung cấp khả năng sử dụng biến phát sóng, cho phép dữ liệu được phát tới các taskmanager cụ thể và lưu trữ trong bộ nhớ. Điều này giúp giảm thiểu các thao tác shuffle lớn.
Trong giai đoạn join dữ liệu, thường phải thực hiện nhiều thao tác shuffle. Để tối ưu hóa, một dataSet có thể được phát sóng trực tiếp vào bộ nhớ của taskManager, từ đó truy cập dữ liệu ngay trong bộ nhớ mà không cần thực hiện quá nhiều thao tác shuffle, cải thiện hiệu suất của cụm máy tính.
Lưu ý: Vì biến phát sóng yêu cầu dataset được lưu trữ trong bộ nhớ, nên kích thước dữ liệu phát sóng không nên quá lớn để tránh lỗi Out Of Memory (OOM).
Đăng ký: Sử dụng withBroadcastSet(dataset, string) để đăng ký biến phát sóng
Truy cập: Sử dụng getRuntimeContext().getBroadcastVariable(String) để lấy biến phát sóng
/**
* Được tạo bởi developer;
*/
object UseBroadcast {
def main(args: Array[String]): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
//TODO Kết hợp dataA với dataB bằng cách sử dụng biến phát sóng
val dataA = new mutable.MutableList[(Int, String, Double)]
dataA.+=((1, "Xin chào", 1.0))
dataA.+=((2, "Chào bạn", 2.0))
dataA.+=((3, "Chào thế giới", 3.0))
val dsA = env.fromCollection(Random.shuffle(dataA))
val dataB = new mutable.MutableList[(Int, String, Int, Double, String)]
dataB.+=((1, "Greeting", 1, 1.5, "First"))
dataB.+=((2, "Hello World", 2, 2.5, "Second"))
dataB.+=((3, "Hi there", 3, 3.5, "Third"))
val dsB = env.fromCollection(Random.shuffle(dataB))
//todo Sử dụng lớp RichMapFunction nội bộ để hoàn thành kết nối
val result = dsA.map(new RichMapFunction[(Int, String, Double), ArrayBuffer[(Int, String, Double, String)]] {
var broadcastData:mutable.Buffer[(Int, String, Int, Double, String)] = null
override def open(parameters: Configuration): Unit = {
import scala.collection.JavaConverters._
//asScala yêu cầu chuyển đổi ngầm
broadcastData = this.getRuntimeContext.getBroadcastVariable[(Int, String, Int, Double, String)]("dsB").asScala
}
override def map(value: (Int, String, Double)):ArrayBuffer[(Int, String, Double, String)] = {
val toArr: Array[(Int, String, Int, Double, String)] = broadcastData.toArray
val buffer = new mutable.ArrayBuffer[(Int, String, Double, String)]
var idx = 0
var res:(Int, String, Double, String) = null
while(idx < toArr.length){
if(value._1 == toArr(idx)._1){
res = (value._1, value._2, value._3, toArr(idx)._5)
buffer += res
}
idx += 1
}
buffer
}
}).withBroadcastSet(dsB, "dsB")
println(result.collect())
}
}
View Code