Để phát triển và kiểm thử ứng dụng xử lý dữ liệu theo lô với Apache Flink, việc lựa chọn môi trường thực thi phù hợp là bước then chốt. Flink cung cấp nhiều cách tiếp cận: từ chạy nội bộ trên một JVM duy nhất đến triển khai phân tán trên cụm máy chủ. Mỗi phương thức phục vụ mục đích riêng — từ debug nhanh, tích hợp kiểm thử tự động đến triển khai sản xuất quy mô lớn.
Môi trường thực thi cục bộ
LocalEnvironment cho phép khởi chạy job Flink trực tiếp trong tiến trình JVM hiện tại, không cần thiết lập cụm hay dịch vụ phụ trợ. Đây là lựa chọn lý tưởng khi phát triển nhanh hoặc tích hợp vào hệ thống khác như ứng dụng Spring Boot hoặc framework kiểm thử.
Một instance LocalEnvironment được tạo bằng ExecutionEnvironment.createLocalEnvironment(). Mặc định, mức độ song song (parallelism) sẽ khớp với số lõi CPU vật lý. Người dùng có thể ghi đè thông số này để mô phỏng hành vi dưới tải thấp hơn hoặc kiểm tra logic xử lý theo luồng:
object BatchLocalRunner {
def main(args: Array[String]): Unit = {
// Khởi tạo môi trường địa phương với độ song song tùy chỉnh
val localEnv = ExecutionEnvironment.createLocalEnvironment(parallelism = 4)
// Đọc dữ liệu CSV với cấu hình phân tích rõ ràng
val inputPath = "input/sample-data.csv"
val dataset = localEnv.readCsvFile[(String, String, Int, Double, String, Long)](
filePath = inputPath,
lineDelimiter = "\n",
fieldDelimiter = ",",
ignoreFirstLine = true,
includedFields = Array(0, 1, 2, 3, 4, 5)
)
// Nhóm theo hai trường đầu và lấy 50 bản ghi đại diện
dataset.groupBy(0, 1).first(50).print("Local Execution Result")
}
}
Lưu ý rằng LocalEnvironment không kích hoạt giao diện web giám sát (JobManager UI), do đó mọi thông tin trạng thái chỉ có thể truy xuất qua log console hoặc API lập trình.
Môi trường dựa trên tập hợp dữ liệu
Khi khối lượng dữ liệu nhỏ và yêu cầu tốc độ phản hồi tức thì — ví dụ trong unit test hoặc REPL tương tác — CollectionEnvironment là lựa chọn tối ưu. Nó bỏ qua cơ chế phân phối, scheduling và serialization phức tạp, thay vào đó xử lý dữ liệu như một chuỗi các thao tác trên collection thuần Java/Scala.
Do đặc tính đơn luồng và không phân tán, môi trường này chỉ phù hợp với tập dữ liệu vừa đủ nằm gọn trong heap JVM (thường dưới vài trăm MB). Không nên sử dụng nó cho benchmark hiệu năng hoặc mô phỏng hành vi cụm.
import org.apache.flink.api.scala._
val testEnv = ExecutionEnvironment.createCollectionsEnvironment
val testData = List(
("A", "X", 100, 12.5),
("B", "Y", 200, 18.3),
("A", "X", 150, 14.1)
)
val ds = testEnv.fromCollection(testData)
ds.groupBy(0, 1)
.reduce { (a, b) => (a._1, a._2, a._3 + b._3, (a._4 + b._4) / 2.0) }
.print("Aggregated Collection Result")
Triển khai trên cụm phân tán
Khi chuyển sang môi trường sản xuất, ứng dụng Flink cần được gửi tới cụm quản lý bởi JobManager và TaskManager. Có hai cơ chế chính để thực hiện điều này:
1. Gửi job qua CLI
Sử dụng script flink đi kèm trong thư mục bin/, người dùng có thể submit jar đã đóng gói trực tiếp tới cụm đang hoạt động:
./bin/flink run \
--class com.example.batch.WordCountApp \
--parallelism 8 \
target/batch-job-1.0.jar \
--input hdfs://namenode:9000/input/data.txt \
--output hdfs://namenode:9000/output/count-result
2. Thực thi từ mã nguồn với RemoteEnvironment
Trong một số kịch bản như CI/CD pipeline hoặc hệ thống điều khiển tự động, việc gọi trực tiếp từ code là cần thiết. ExecutionEnvironment.createRemoteEnvironment() cho phép kết nối tới một cụm Flink đang chạy và gửi job mà không cần build lại jar thủ công mỗi lần:
val clusterEnv = ExecutionEnvironment.createRemoteEnvironment(
host = "jobmanager-host",
port = 6123,
jarFiles = Seq("target/distributed-job-1.0.jar")
)
val source = clusterEnv.readTextFile("hdfs://namenode:9000/logs/access.log")
val wordCounts = source
.flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty))
.map((_, 1))
.groupBy(0)
.sum(1)
wordCounts.writeAsCsv("hdfs://namenode:9000/output/word-freq", "\n", ",")
clusterEnv.execute("Distributed Word Frequency Analysis")
Để đảm bảo tính di động của job, cấu hình Maven cần được thiết lập đúng để đóng gói cả dependencies và xác định lớp nhập vào (main class):
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals><goal>shade</goal></goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.example.batch.BatchEntryPoint</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>