Hướng dẫn Lập trình và Cấu hình Apache Flink

Thiết lập Maven cho dự án Flink

Để bắt đầu phát triển ứng dụng với Apache Flink, bạn cần khai báo các thư viện cần thiết trong tệp pom.xml. Dưới đây là cấu hình cơ bản cho phiên bản Flink 1.17.0:

<properties>
    <flink.version>1.17.0</flink.version>
    <java.version>1.8</java.version>
    <maven.compiler.source>${java.version}</maven.compiler.source>
    <maven.compiler.target>${java.version}</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
    <!-- API xử lý luồng Java của Flink -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <!-- Thư viện client để chạy cục bộ -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>

Ví dụ xử lý luồng: Đếm từ (WordCount)

Dưới đây là đoạn mã thực hiện đếm từ từ một tệp tin đầu vào. Logic sẽ chia dòng thành các từ, gán giá trị đếm ban đầu là 1, nhóm theo từ và tính tổng.

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class FileWordCountJob {
    public static void main(String[] args) throws Exception {
        // Khởi tạo môi trường thực thi luồng
        final StreamExecutionEnvironment executionEnv = StreamExecutionEnvironment.getExecutionEnvironment();

        // Đọc dữ liệu từ đường dẫn tệp chỉ định
        DataStreamSource<String> inputStream = executionEnv.readTextFile("path/to/input/data.txt");

        // Thực hiện biến đổi dữ liệu: làm phẳng -> nhóm -> tổng hợp
        SingleOutputStreamOperator<Tuple2<String, Integer>> resultStream = inputStream
            .flatMap(new Tokenizer())
            .keyBy(value -> value.f0)
            .sum("f1");

        // Xuất kết quả ra console
        resultStream.print();

        // Kích hoạt thực thi job
        executionEnv.execute("File Word Count");
    }

    // Lớp tách từ
    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            // Chuẩn hóa văn bản và tách theo khoảng trắng
            String[] tokens = value.toLowerCase().split("\\W+");
            
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }
}

Ví dụ đọc từ Socket với biểu thức Lambda

Ví dụ này minh họa việc đọc dữ liệu trực tiếp từ một socket mạng. Chúng ta sẽ sử dụng biểu thức Lambda để làm cho mã nguồn ngắn gọn hơn, tuy nhiên cần chỉ định rõ kiểu trả về bằng .returns() do Java xóa kiểu khi biên dịch (type erasure).

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class SocketTextProcessor {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Kết nối đến socket tại localhost:9999
        DataStreamSource<String> socketStream = env.socketTextStream("localhost", 9999);

        SingleOutputStreamOperator<Tuple2<String, Long>> aggregated = socketStream
            .flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
                String[] words = line.split("\\s");
                for (String w : words) {
                    if (!w.isEmpty()) {
                        out.collect(Tuple2.of(w, 1L));
                    }
                }
            })
            .returns(Types.TUPLE(Types.STRING, Types.LONG)) // Cần thiết khi dùng Lambda
            .keyBy(tuple -> tuple.f0)
            .sum(1);

        aggregated.print();

        env.execute("Socket Word Aggregation");
    }
}

Triển khai Flink trên YARN

Khi chạy trên cụm YARN, Flink sẽ tự động phân bổ tài nguyên cho TaskManager dựa trên yêu cầu Slot của Job chạy trên JobManager.

Để khởi động một phiên làm việc Flink trên YARN:

  1. Khởi động cụm Hadoop (HDFS và YARN).
  2. Chạy lệnh YARN session: Script này sẽ gửi yêu cầu cấp phát tài nguyên đến YARN và khởi tạo Flink cluster.
    bin/yarn-session.sh -nm flink-session-01 -d

Các tham số quan trọng:

  • -d: Chế độ tách (detached), cho phép session chạy nền sau khi thoát terminal.
  • -jm hoặc --jobManagerMemory: Thiết lập bộ nhớ cho JobManager (mặc định MB).
  • -nm hoặc --name: Tên hiển thị của ứng dụng trên giao diện YARN.
  • -qu hoặc --queue: Chỉ định hàng đợi YARN để chạy job.
  • -tm hoặc --taskManagerMemory: Thiết lập bộ nhớ cho mỗi TaskManager.

Cấu hình History Server

Mặc định, khi cụm Flink dừng lại, giao diện Web UI sẽ không còn truy cập được để kiểm tra lịch sử chạy job. History Server giải quyết vấn đề này bằng cách lưu trữ thông tin thống kê của các job đã hoàn thành (dù bình thường hay gặp lỗi) lên hệ thống tệp tin (như HDFS). Điều này giúp phục hồi lại Web UI để kiểm tra Checkpoint cuối cùng hay cấu hình runtime.

Các bước thiết lập:

  1. Tạo thư mục lưu trữ trên HDFS:
    hadoop fs -mkdir -p /flink/logs/archives
  2. Cấu hình trong flink-conf.yaml:
    # Đường dẫn lưu trữ archive của JobManager
    jobmanager.archive.fs.dir: hdfs://namenode:8020/flink/logs/archives
    
    # Cấu hình History Server
    historyserver.web.address: namenode
    historyserver.web.port: 8082
    historyserver.archive.fs.dir: hdfs://namenode:8020/flink/logs/archives
    historyserver.archive.fs.refresh-interval: 10000
    
  3. Khởi động/dừng History Server:
    bin/historyserver.sh start
    bin/historyserver.sh stop

Mechanism chia sẻ Slot (Slot Sharing)

Trong Flink, chỉ các tác vụ con (subtask) thuộc cùng một Slot Sharing Group mới có thể chạy chung trên một Slot. Các nhóm khác nhau sẽ được cô lập hoàn toàn và bắt buộc phải chiếm các Slot vật lý khác nhau.

Sử dụng DataGen Connector tạo dữ liệu giả

Kể từ phiên bản 1.11, Flink cung cấp kết nối DataGen để tạo dữ liệu ngẫu nhiên phục vụ việc kiểm thử hoặc đo hiệu năng mà không cần nguồn dữ liệu thật. Với Flink 1.17+, chúng ta sử dụng cách viết mới cho Source.

Dependency Maven:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-datagen</artifactId>
    <version>${flink.version}</version>
</dependency>

Ví dụ mã nguồn:

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
import org.apache.flink.streaming.api.functions.source.datagen.GeneratorFunction;
import org.apache.flink.streaming.api.watermark.strategy.WatermarkStrategy;

public class SyntheticDataGen {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataGeneratorSource<String> generator = new DataGeneratorSource<>(
            new GeneratorFunction<Long, String>() {
                @Override
                public String map(Long value) {
                    return "Event-ID-" + value;
                }
            },
            Long.MAX_VALUE,
            org.apache.flink.streaming.api.functions.source.datagen.RateLimiterStrategy.perSecond(5),
            Types.STRING
        );

        DataStreamSource<String> stream = env.fromSource(
            generator, 
            WatermarkStrategy.noWatermarks(), 
            "random-generator"
        );

        stream.print();

        env.execute("Data Generator Test");
    }
}

Các hàm tổng hợp (Aggregation)

POJO (Plain Old Java Object) là các đối tượng Java đơn giản, không kế thừa từ các lớp đặc biệt hoặc bị framework xâm nhập.

Các toán tử tổng hợp thường dùng phải được gọi sau keyBy:

  • sum(): Cộng dồn giá trị của trường chỉ định.
  • min(): Tìm giá trị nhỏ nhất của trường chỉ định (các trường khác giữ giá trị của bản ghi đầu tiên).
  • max(): Tìm giá trị lớn nhất của trường chỉ định (giữ nguyên các trường khác như bản ghi đầu).
  • minBy(): Tìm bản ghi có giá trị nhỏ nhất (trả về toàn bộ bản ghi đó).
  • maxBy(): Tìm bản ghi có giá trị lớn nhất (trả về toàn bộ bản ghi đó).

Lập trình có trạng thái với Rich Function

Rich Function cung cấp các phương thức vòng đời (lifecycle methods) cho phép quản lý tài nguyên trước và sau khi xử lý dữ liệu:

  • open(): Được gọi 1 lần khi khởi tạo subtask (trước khi map() hoặc filter() chạy).
  • close(): Được gọi 1 lần khi subtask kết thúc, thường dùng để dọn dẹp tài nguyên.

Ví dụ:

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class RichFunctionLifecycleDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        DataStream<Integer> data = env.fromElements(10, 20, 30, 40);

        data.map(new RichMapFunction<Integer, String>() {
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                System.out.println("Subtask " + getRuntimeContext().getIndexOfThisSubtask() + " started.");
            }

            @Override
            public String map(Integer value) throws Exception {
                return "Processed: " + (value * 2);
            }

            @Override
            public void close() throws Exception {
                super.close();
                System.out.println("Subtask " + getRuntimeContext().getIndexOfThisSubtask() + " finished.");
            }
        }).print();

        env.execute("Rich Function Example");
    }
}

Ghi dữ liệu ra Kafka (Sink)

Để ghi dữ liệu vào Kafka, bạn cần thêm dependency flink-connector-kafka và khởi động Kafka cluster.

1. Ghi record không có Key

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.kafka.clients.producer.ProducerConfig;

public class KafkaSinkProducer {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Bắt buộc bật Checkpoint để hỗ trợ Exactly-Once
        env.enableCheckpointing(3000, CheckpointingMode.EXACTLY_ONCE);

        SingleOutputStreamOperator<String> sourceData = env.socketTextStream("localhost", 7777);

        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers("broker1:9092,broker2:9092")
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.builder()
                                .setTopic("output-topic")
                                .setValueSerializationSchema(new SimpleStringSchema())
                                .build()
                )
                // Đảm bảo Exactly-Once
                .setDeliveryGuarantee(org.apache.flink.connector.base.DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix("flink-tx-")
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 15 * 60 * 1000 + "")
                .build();

        sourceData.sinkTo(sink);

        env.execute("Write to Kafka");
    }
}

2. Ghi record có Key tùy chỉnh

Để xác định phân vùng (partition) dựa trên key, bạn cần triển khai giao thức serialization tùy chỉnh:

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.nio.charset.StandardCharsets;

public class KafkaKeyedSink {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        SingleOutputStreamOperator<String> inputData = env.socketTextStream("localhost", 7777);

        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers("broker1:9092")
                .setRecordSerializer(new KafkaRecordSerializationSchema<String>() {
                    @Override
                    public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
                        String[] parts = element.split(",");
                        // Giả sử phần tử đầu là Key
                        byte[] key = parts[0].getBytes(StandardCharsets.UTF_8);
                        byte[] value = element.getBytes(StandardCharsets.UTF_8);
                        return new ProducerRecord<>("output-topic", key, value);
                    }
                })
                .setDeliveryGuarantee(org.apache.flink.connector.base.DeliveryGuarantee.EXACTLY_ONCE)
                .setTransactionalIdPrefix("keyed-tx-")
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 900000 + "")
                .build();

        inputData.sinkTo(sink);
        env.execute();
    }
}

Cửa sổ thời gian (Windowing) và KeyBy

Toán tử keyBy phân chia dữ liệu thành các luồng logic theo khóa, là bước bắt buộc trước khi thực hiện các trạng thái (state) hoặc tổng hợp. Sau keyBy, dữ liệu được xử lý trong các cửa sổ để giới hạn phạm vi tính toán.

Các loại cửa sổ phổ biến:

  • Tumbling Window: Cửa sổ xếp chồng, không chồng lấn.
  • Sliding Window: Cửa sổ trượt, có khoảng chồng lấn.
  • Session Window: Dựa trên khoảng thời gian hoạt động của session.
  • Global Window: Cửa sổ toàn cục.

Xử lý thời gian (Event Time) và Watermark

Trong xử lý luồng, Event Time là thời điểm sự kiện thực sự xảy ra (thường nằm trong dữ liệu). Flink sử dụng cơ chế Watermark để xử lý dữ liệu đến muộn (out-of-order). Watermark đóng vai trò như một đồng hồ nội bộ, báo hiệu cho Flink rằng không có dữ liệu có timestamp nhỏ hơn mốc thời gian này sẽ đến nữa.

Ví dụ: Nếu Watermark hiện tại là 10:00:05, các sự kiện có thời gian 10:00:00 (đã trễ) vẫn có thể được xử lý nếu nằm trong dung sai cho phép.

Kiểm điểm (Checkpointing)

Checkpoint là cơ chế sao lưu trạng thái của ứng dụng Flink vào một hệ thống lưu trữ ổn định (như HDFS). Điều này đảm bảo tính toàn vẹn của dữ liệu và cho phép khôi phục lại trạng thái trước đó nếu hệ thống gặp lỗi (fault tolerance).

Thẻ: apache-flink stream-processing Java kafka checkpoint

Đăng vào ngày 16 tháng 6 lúc 09:52