Thiết lập Maven cho dự án Flink
Để bắt đầu phát triển ứng dụng với Apache Flink, bạn cần khai báo các thư viện cần thiết trong tệp pom.xml. Dưới đây là cấu hình cơ bản cho phiên bản Flink 1.17.0:
<properties>
<flink.version>1.17.0</flink.version>
<java.version>1.8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- API xử lý luồng Java của Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Thư viện client để chạy cục bộ -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
Ví dụ xử lý luồng: Đếm từ (WordCount)
Dưới đây là đoạn mã thực hiện đếm từ từ một tệp tin đầu vào. Logic sẽ chia dòng thành các từ, gán giá trị đếm ban đầu là 1, nhóm theo từ và tính tổng.
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class FileWordCountJob {
public static void main(String[] args) throws Exception {
// Khởi tạo môi trường thực thi luồng
final StreamExecutionEnvironment executionEnv = StreamExecutionEnvironment.getExecutionEnvironment();
// Đọc dữ liệu từ đường dẫn tệp chỉ định
DataStreamSource<String> inputStream = executionEnv.readTextFile("path/to/input/data.txt");
// Thực hiện biến đổi dữ liệu: làm phẳng -> nhóm -> tổng hợp
SingleOutputStreamOperator<Tuple2<String, Integer>> resultStream = inputStream
.flatMap(new Tokenizer())
.keyBy(value -> value.f0)
.sum("f1");
// Xuất kết quả ra console
resultStream.print();
// Kích hoạt thực thi job
executionEnv.execute("File Word Count");
}
// Lớp tách từ
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// Chuẩn hóa văn bản và tách theo khoảng trắng
String[] tokens = value.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}
Ví dụ đọc từ Socket với biểu thức Lambda
Ví dụ này minh họa việc đọc dữ liệu trực tiếp từ một socket mạng. Chúng ta sẽ sử dụng biểu thức Lambda để làm cho mã nguồn ngắn gọn hơn, tuy nhiên cần chỉ định rõ kiểu trả về bằng .returns() do Java xóa kiểu khi biên dịch (type erasure).
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class SocketTextProcessor {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Kết nối đến socket tại localhost:9999
DataStreamSource<String> socketStream = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<Tuple2<String, Long>> aggregated = socketStream
.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
String[] words = line.split("\\s");
for (String w : words) {
if (!w.isEmpty()) {
out.collect(Tuple2.of(w, 1L));
}
}
})
.returns(Types.TUPLE(Types.STRING, Types.LONG)) // Cần thiết khi dùng Lambda
.keyBy(tuple -> tuple.f0)
.sum(1);
aggregated.print();
env.execute("Socket Word Aggregation");
}
}
Triển khai Flink trên YARN
Khi chạy trên cụm YARN, Flink sẽ tự động phân bổ tài nguyên cho TaskManager dựa trên yêu cầu Slot của Job chạy trên JobManager.
Để khởi động một phiên làm việc Flink trên YARN:
- Khởi động cụm Hadoop (HDFS và YARN).
- Chạy lệnh YARN session: Script này sẽ gửi yêu cầu cấp phát tài nguyên đến YARN và khởi tạo Flink cluster.
bin/yarn-session.sh -nm flink-session-01 -d
Các tham số quan trọng:
-d: Chế độ tách (detached), cho phép session chạy nền sau khi thoát terminal.-jmhoặc--jobManagerMemory: Thiết lập bộ nhớ cho JobManager (mặc định MB).-nmhoặc--name: Tên hiển thị của ứng dụng trên giao diện YARN.-quhoặc--queue: Chỉ định hàng đợi YARN để chạy job.-tmhoặc--taskManagerMemory: Thiết lập bộ nhớ cho mỗi TaskManager.
Cấu hình History Server
Mặc định, khi cụm Flink dừng lại, giao diện Web UI sẽ không còn truy cập được để kiểm tra lịch sử chạy job. History Server giải quyết vấn đề này bằng cách lưu trữ thông tin thống kê của các job đã hoàn thành (dù bình thường hay gặp lỗi) lên hệ thống tệp tin (như HDFS). Điều này giúp phục hồi lại Web UI để kiểm tra Checkpoint cuối cùng hay cấu hình runtime.
Các bước thiết lập:
- Tạo thư mục lưu trữ trên HDFS:
hadoop fs -mkdir -p /flink/logs/archives - Cấu hình trong
flink-conf.yaml:
# Đường dẫn lưu trữ archive của JobManager jobmanager.archive.fs.dir: hdfs://namenode:8020/flink/logs/archives # Cấu hình History Server historyserver.web.address: namenode historyserver.web.port: 8082 historyserver.archive.fs.dir: hdfs://namenode:8020/flink/logs/archives historyserver.archive.fs.refresh-interval: 10000 - Khởi động/dừng History Server:
bin/historyserver.sh start
bin/historyserver.sh stop
Mechanism chia sẻ Slot (Slot Sharing)
Trong Flink, chỉ các tác vụ con (subtask) thuộc cùng một Slot Sharing Group mới có thể chạy chung trên một Slot. Các nhóm khác nhau sẽ được cô lập hoàn toàn và bắt buộc phải chiếm các Slot vật lý khác nhau.
Sử dụng DataGen Connector tạo dữ liệu giả
Kể từ phiên bản 1.11, Flink cung cấp kết nối DataGen để tạo dữ liệu ngẫu nhiên phục vụ việc kiểm thử hoặc đo hiệu năng mà không cần nguồn dữ liệu thật. Với Flink 1.17+, chúng ta sử dụng cách viết mới cho Source.
Dependency Maven:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-datagen</artifactId>
<version>${flink.version}</version>
</dependency>
Ví dụ mã nguồn:
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
import org.apache.flink.streaming.api.functions.source.datagen.GeneratorFunction;
import org.apache.flink.streaming.api.watermark.strategy.WatermarkStrategy;
public class SyntheticDataGen {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataGeneratorSource<String> generator = new DataGeneratorSource<>(
new GeneratorFunction<Long, String>() {
@Override
public String map(Long value) {
return "Event-ID-" + value;
}
},
Long.MAX_VALUE,
org.apache.flink.streaming.api.functions.source.datagen.RateLimiterStrategy.perSecond(5),
Types.STRING
);
DataStreamSource<String> stream = env.fromSource(
generator,
WatermarkStrategy.noWatermarks(),
"random-generator"
);
stream.print();
env.execute("Data Generator Test");
}
}
Các hàm tổng hợp (Aggregation)
POJO (Plain Old Java Object) là các đối tượng Java đơn giản, không kế thừa từ các lớp đặc biệt hoặc bị framework xâm nhập.
Các toán tử tổng hợp thường dùng phải được gọi sau keyBy:
- sum(): Cộng dồn giá trị của trường chỉ định.
- min(): Tìm giá trị nhỏ nhất của trường chỉ định (các trường khác giữ giá trị của bản ghi đầu tiên).
- max(): Tìm giá trị lớn nhất của trường chỉ định (giữ nguyên các trường khác như bản ghi đầu).
- minBy(): Tìm bản ghi có giá trị nhỏ nhất (trả về toàn bộ bản ghi đó).
- maxBy(): Tìm bản ghi có giá trị lớn nhất (trả về toàn bộ bản ghi đó).
Lập trình có trạng thái với Rich Function
Rich Function cung cấp các phương thức vòng đời (lifecycle methods) cho phép quản lý tài nguyên trước và sau khi xử lý dữ liệu:
open(): Được gọi 1 lần khi khởi tạo subtask (trước khimap()hoặcfilter()chạy).close(): Được gọi 1 lần khi subtask kết thúc, thường dùng để dọn dẹp tài nguyên.
Ví dụ:
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class RichFunctionLifecycleDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStream<Integer> data = env.fromElements(10, 20, 30, 40);
data.map(new RichMapFunction<Integer, String>() {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
System.out.println("Subtask " + getRuntimeContext().getIndexOfThisSubtask() + " started.");
}
@Override
public String map(Integer value) throws Exception {
return "Processed: " + (value * 2);
}
@Override
public void close() throws Exception {
super.close();
System.out.println("Subtask " + getRuntimeContext().getIndexOfThisSubtask() + " finished.");
}
}).print();
env.execute("Rich Function Example");
}
}
Ghi dữ liệu ra Kafka (Sink)
Để ghi dữ liệu vào Kafka, bạn cần thêm dependency flink-connector-kafka và khởi động Kafka cluster.
1. Ghi record không có Key
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.kafka.clients.producer.ProducerConfig;
public class KafkaSinkProducer {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Bắt buộc bật Checkpoint để hỗ trợ Exactly-Once
env.enableCheckpointing(3000, CheckpointingMode.EXACTLY_ONCE);
SingleOutputStreamOperator<String> sourceData = env.socketTextStream("localhost", 7777);
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("broker1:9092,broker2:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic("output-topic")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
// Đảm bảo Exactly-Once
.setDeliveryGuarantee(org.apache.flink.connector.base.DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("flink-tx-")
.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 15 * 60 * 1000 + "")
.build();
sourceData.sinkTo(sink);
env.execute("Write to Kafka");
}
}
2. Ghi record có Key tùy chỉnh
Để xác định phân vùng (partition) dựa trên key, bạn cần triển khai giao thức serialization tùy chỉnh:
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.nio.charset.StandardCharsets;
public class KafkaKeyedSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SingleOutputStreamOperator<String> inputData = env.socketTextStream("localhost", 7777);
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("broker1:9092")
.setRecordSerializer(new KafkaRecordSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
String[] parts = element.split(",");
// Giả sử phần tử đầu là Key
byte[] key = parts[0].getBytes(StandardCharsets.UTF_8);
byte[] value = element.getBytes(StandardCharsets.UTF_8);
return new ProducerRecord<>("output-topic", key, value);
}
})
.setDeliveryGuarantee(org.apache.flink.connector.base.DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("keyed-tx-")
.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 900000 + "")
.build();
inputData.sinkTo(sink);
env.execute();
}
}
Cửa sổ thời gian (Windowing) và KeyBy
Toán tử keyBy phân chia dữ liệu thành các luồng logic theo khóa, là bước bắt buộc trước khi thực hiện các trạng thái (state) hoặc tổng hợp. Sau keyBy, dữ liệu được xử lý trong các cửa sổ để giới hạn phạm vi tính toán.
Các loại cửa sổ phổ biến:
- Tumbling Window: Cửa sổ xếp chồng, không chồng lấn.
- Sliding Window: Cửa sổ trượt, có khoảng chồng lấn.
- Session Window: Dựa trên khoảng thời gian hoạt động của session.
- Global Window: Cửa sổ toàn cục.
Xử lý thời gian (Event Time) và Watermark
Trong xử lý luồng, Event Time là thời điểm sự kiện thực sự xảy ra (thường nằm trong dữ liệu). Flink sử dụng cơ chế Watermark để xử lý dữ liệu đến muộn (out-of-order). Watermark đóng vai trò như một đồng hồ nội bộ, báo hiệu cho Flink rằng không có dữ liệu có timestamp nhỏ hơn mốc thời gian này sẽ đến nữa.
Ví dụ: Nếu Watermark hiện tại là 10:00:05, các sự kiện có thời gian 10:00:00 (đã trễ) vẫn có thể được xử lý nếu nằm trong dung sai cho phép.
Kiểm điểm (Checkpointing)
Checkpoint là cơ chế sao lưu trạng thái của ứng dụng Flink vào một hệ thống lưu trữ ổn định (như HDFS). Điều này đảm bảo tính toàn vẹn của dữ liệu và cho phép khôi phục lại trạng thái trước đó nếu hệ thống gặp lỗi (fault tolerance).