Chạy Ứng Dụng Flink trong Môi Trường Địa Phương và Phân Tán

Để phát triển và kiểm thử ứng dụng xử lý dữ liệu theo lô với Apache Flink, việc lựa chọn môi trường thực thi phù hợp là bước then chốt. Flink cung cấp nhiều cách tiếp cận: từ chạy nội bộ trên một JVM duy nhất đến triển khai phân tán trên cụm máy chủ. Mỗi phương thức phục vụ mục đích riêng — từ debug nhanh, tích hợp kiểm thử tự động đến triển khai sản xuất quy mô lớn.

Môi trường thực thi cục bộ

LocalEnvironment cho phép khởi chạy job Flink trực tiếp trong tiến trình JVM hiện tại, không cần thiết lập cụm hay dịch vụ phụ trợ. Đây là lựa chọn lý tưởng khi phát triển nhanh hoặc tích hợp vào hệ thống khác như ứng dụng Spring Boot hoặc framework kiểm thử.

Một instance LocalEnvironment được tạo bằng ExecutionEnvironment.createLocalEnvironment(). Mặc định, mức độ song song (parallelism) sẽ khớp với số lõi CPU vật lý. Người dùng có thể ghi đè thông số này để mô phỏng hành vi dưới tải thấp hơn hoặc kiểm tra logic xử lý theo luồng:

object BatchLocalRunner {
  def main(args: Array[String]): Unit = {
    // Khởi tạo môi trường địa phương với độ song song tùy chỉnh
    val localEnv = ExecutionEnvironment.createLocalEnvironment(parallelism = 4)
    
    // Đọc dữ liệu CSV với cấu hình phân tích rõ ràng
    val inputPath = "input/sample-data.csv"
    val dataset = localEnv.readCsvFile[(String, String, Int, Double, String, Long)](
      filePath = inputPath,
      lineDelimiter = "\n",
      fieldDelimiter = ",",
      ignoreFirstLine = true,
      includedFields = Array(0, 1, 2, 3, 4, 5)
    )
    
    // Nhóm theo hai trường đầu và lấy 50 bản ghi đại diện
    dataset.groupBy(0, 1).first(50).print("Local Execution Result")
  }
}

Lưu ý rằng LocalEnvironment không kích hoạt giao diện web giám sát (JobManager UI), do đó mọi thông tin trạng thái chỉ có thể truy xuất qua log console hoặc API lập trình.

Môi trường dựa trên tập hợp dữ liệu

Khi khối lượng dữ liệu nhỏ và yêu cầu tốc độ phản hồi tức thì — ví dụ trong unit test hoặc REPL tương tác — CollectionEnvironment là lựa chọn tối ưu. Nó bỏ qua cơ chế phân phối, scheduling và serialization phức tạp, thay vào đó xử lý dữ liệu như một chuỗi các thao tác trên collection thuần Java/Scala.

Do đặc tính đơn luồng và không phân tán, môi trường này chỉ phù hợp với tập dữ liệu vừa đủ nằm gọn trong heap JVM (thường dưới vài trăm MB). Không nên sử dụng nó cho benchmark hiệu năng hoặc mô phỏng hành vi cụm.

import org.apache.flink.api.scala._

val testEnv = ExecutionEnvironment.createCollectionsEnvironment

val testData = List(
  ("A", "X", 100, 12.5),
  ("B", "Y", 200, 18.3),
  ("A", "X", 150, 14.1)
)

val ds = testEnv.fromCollection(testData)
ds.groupBy(0, 1)
  .reduce { (a, b) => (a._1, a._2, a._3 + b._3, (a._4 + b._4) / 2.0) }
  .print("Aggregated Collection Result")

Triển khai trên cụm phân tán

Khi chuyển sang môi trường sản xuất, ứng dụng Flink cần được gửi tới cụm quản lý bởi JobManager và TaskManager. Có hai cơ chế chính để thực hiện điều này:

1. Gửi job qua CLI

Sử dụng script flink đi kèm trong thư mục bin/, người dùng có thể submit jar đã đóng gói trực tiếp tới cụm đang hoạt động:

./bin/flink run \
  --class com.example.batch.WordCountApp \
  --parallelism 8 \
  target/batch-job-1.0.jar \
  --input hdfs://namenode:9000/input/data.txt \
  --output hdfs://namenode:9000/output/count-result

2. Thực thi từ mã nguồn với RemoteEnvironment

Trong một số kịch bản như CI/CD pipeline hoặc hệ thống điều khiển tự động, việc gọi trực tiếp từ code là cần thiết. ExecutionEnvironment.createRemoteEnvironment() cho phép kết nối tới một cụm Flink đang chạy và gửi job mà không cần build lại jar thủ công mỗi lần:

val clusterEnv = ExecutionEnvironment.createRemoteEnvironment(
  host = "jobmanager-host",
  port = 6123,
  jarFiles = Seq("target/distributed-job-1.0.jar")
)

val source = clusterEnv.readTextFile("hdfs://namenode:9000/logs/access.log")
val wordCounts = source
  .flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty))
  .map((_, 1))
  .groupBy(0)
  .sum(1)

wordCounts.writeAsCsv("hdfs://namenode:9000/output/word-freq", "\n", ",")

clusterEnv.execute("Distributed Word Frequency Analysis")

Để đảm bảo tính di động của job, cấu hình Maven cần được thiết lập đúng để đóng gói cả dependencies và xác định lớp nhập vào (main class):

<plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-shade-plugin</artifactId>
  <version>3.4.1</version>
  <executions>
    <execution>
      <phase>package</phase>
      <goals><goal>shade</goal></goals>
      <configuration>
        <transformers>
          <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
            <mainClass>com.example.batch.BatchEntryPoint</mainClass>
          </transformer>
        </transformers>
      </configuration>
    </execution>
  </executions>
</plugin>

Thẻ: Flink batch-processing local-execution remote-execution apache-flink

Đăng vào ngày 6 tháng 6 lúc 00:39