Biến phát sóng trong Flink

Flink cung cấp khả năng sử dụng biến phát sóng, cho phép dữ liệu được phát tới các taskmanager cụ thể và lưu trữ trong bộ nhớ. Điều này giúp giảm thiểu các thao tác shuffle lớn.

Trong giai đoạn join dữ liệu, thường phải thực hiện nhiều thao tác shuffle. Để tối ưu hóa, một dataSet có thể được phát sóng trực tiếp vào bộ nhớ của taskManager, từ đó truy cập dữ liệu ngay trong bộ nhớ mà không cần thực hiện quá nhiều thao tác shuffle, cải thiện hiệu suất của cụm máy tính.

Lưu ý: Vì biến phát sóng yêu cầu dataset được lưu trữ trong bộ nhớ, nên kích thước dữ liệu phát sóng không nên quá lớn để tránh lỗi Out Of Memory (OOM).

Đăng ký: Sử dụng withBroadcastSet(dataset, string) để đăng ký biến phát sóng

Truy cập: Sử dụng getRuntimeContext().getBroadcastVariable(String) để lấy biến phát sóng
/**
  * Được tạo bởi developer;
  */
object UseBroadcast {
  def main(args: Array[String]): Unit = {
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    //TODO Kết hợp dataA với dataB bằng cách sử dụng biến phát sóng
    val dataA = new mutable.MutableList[(Int, String, Double)]
    dataA.+=((1, "Xin chào", 1.0))
    dataA.+=((2, "Chào bạn", 2.0))
    dataA.+=((3, "Chào thế giới", 3.0))
    val dsA = env.fromCollection(Random.shuffle(dataA))
    val dataB = new mutable.MutableList[(Int, String, Int, Double, String)]
    dataB.+=((1, "Greeting", 1, 1.5, "First"))
    dataB.+=((2, "Hello World", 2, 2.5, "Second"))
    dataB.+=((3, "Hi there", 3, 3.5, "Third"))
    val dsB = env.fromCollection(Random.shuffle(dataB))

    //todo Sử dụng lớp RichMapFunction nội bộ để hoàn thành kết nối
    val result = dsA.map(new RichMapFunction[(Int, String, Double), ArrayBuffer[(Int, String, Double, String)]] {

      var broadcastData:mutable.Buffer[(Int, String, Int, Double, String)] = null

      override def open(parameters: Configuration): Unit = {
        import scala.collection.JavaConverters._
        //asScala yêu cầu chuyển đổi ngầm
        broadcastData = this.getRuntimeContext.getBroadcastVariable[(Int, String, Int, Double, String)]("dsB").asScala
      }

      override def map(value: (Int, String, Double)):ArrayBuffer[(Int, String, Double, String)] = {
        val toArr: Array[(Int, String, Int, Double, String)] = broadcastData.toArray
        val buffer = new mutable.ArrayBuffer[(Int, String, Double, String)]
        var idx = 0

        var res:(Int, String, Double, String) = null
        while(idx < toArr.length){
          if(value._1 == toArr(idx)._1){
            res = (value._1, value._2, value._3, toArr(idx)._5)
            buffer += res
          }
          idx += 1
        }
        buffer
      }
    }).withBroadcastSet(dsB, "dsB")
    println(result.collect())
  }
}

View Code

Thẻ: Flink BroadcastVariables Scala DataProcessing

Đăng vào ngày 21 tháng 5 lúc 22:47