Tích hợp Flink CDC với PostgreSQL: Lưu trữ và khôi phục checkpoint tùy chỉnh

Để tích hợp Flink CDC với PostgreSQL trong môi trường Spring Boot và thay thế cơ chế lưu trữ checkpoint mặc định (RocksDB/HDFS) bằng cơ sở dữ liệu quan hệ, cần triển khai một lớp lưu trữ checkpoint tùy chỉnh dựa trên giao diện CheckpointStorage của Flink. Giải pháp dưới đây tập trung vào việc lưu trữ metadata checkpoint và trạng thái liên quan (như LSN, offset) vào PostgreSQL, đồng thời đảm bảo khả năng khôi phục chính xác sau sự cố.

1. Thiết kế bảng lưu trữ checkpoint

Tạo bảng trong PostgreSQL để chứa thông tin checkpoint theo từng job:
CREATE TABLE flink_checkpoint_meta (
    id BIGSERIAL PRIMARY KEY,
    job_key VARCHAR(128) NOT NULL,
    checkpoint_seq BIGINT NOT NULL,
    lsn TEXT,  -- Logical Sequence Number từ PostgreSQL WAL
    snapshot_time TIMESTAMPTZ DEFAULT NOW(),
    state_bytes BYTEA,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    CONSTRAINT uk_job_seq UNIQUE (job_key, checkpoint_seq)
);

CREATE INDEX idx_job_latest ON flink_checkpoint_meta (job_key, checkpoint_seq DESC);

2. Triển khai lớp CheckpointStorage tùy chỉnh

Thay vì kế thừa toàn bộ luồng FsCheckpointStorageAccess, ta xây dựng một implementation thuần dựa trên JDBC — tối giản và kiểm soát trực tiếp luồng ghi/đọc:
public class PgCheckpointStorage implements CheckpointStorage {

    private final DataSource dataSource;
    private final String jobKey;

    public PgCheckpointStorage(DataSource dataSource, String jobKey) {
        this.dataSource = dataSource;
        this.jobKey = jobKey;
    }

    @Override
    public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException {
        return new PgCheckpointLocation(checkpointId, jobKey, dataSource);
    }

    @Override
    public CheckpointStorageLocation resolveCheckpointStorageLocation(long checkpointId, String externalPointer) {
        return new PgCheckpointLocation(checkpointId, jobKey, dataSource);
    }

    @Override
    public boolean supportsHighlyAvailableStorage() {
        return true;
    }

    @Override
    public Path getDefaultCheckpointPath() {
        return new Path("pg://checkpoint/" + jobKey);
    }

    static class PgCheckpointLocation implements CheckpointStorageLocation {

        private final long seq;
        private final String key;
        private final DataSource ds;

        PgCheckpointLocation(long seq, String key, DataSource ds) {
            this.seq = seq;
            this.key = key;
            this.ds = ds;
        }

        @Override
        public Path getCheckpointPath() {
            return new Path("pg://checkpoint/" + key + "/" + seq);
        }

        @Override
        public CheckpointStorageAccess createCheckpointStorageAccess() {
            return new PgCheckpointAccess(seq, key, ds);
        }
    }

    static class PgCheckpointAccess implements CheckpointStorageAccess {

        private final long seq;
        private final String key;
        private final DataSource ds;

        PgCheckpointAccess(long seq, String key, DataSource ds) {
            this.seq = seq;
            this.key = key;
            this.ds = ds;
        }

        @Override
        public void storeMetadata(byte[] bytes) throws IOException {
            String sql = "INSERT INTO flink_checkpoint_meta (job_key, checkpoint_seq, state_bytes) VALUES (?, ?, ?) " +
                         "ON CONFLICT (job_key, checkpoint_seq) DO UPDATE SET state_bytes = EXCLUDED.state_bytes";
            try (Connection conn = ds.getConnection();
                 PreparedStatement ps = conn.prepareStatement(sql)) {
                ps.setString(1, key);
                ps.setLong(2, seq);
                ps.setBytes(3, bytes);
                ps.executeUpdate();
            } catch (SQLException e) {
                throw new IOException("Failed to persist checkpoint metadata", e);
            }
        }

        @Override
        public byte[] loadMetadata() throws IOException {
            String sql = "SELECT state_bytes FROM flink_checkpoint_meta WHERE job_key = ? AND checkpoint_seq = ?";
            try (Connection conn = ds.getConnection();
                 PreparedStatement ps = conn.prepareStatement(sql)) {
                ps.setString(1, key);
                ps.setLong(2, seq);
                try (ResultSet rs = ps.executeQuery()) {
                    if (rs.next()) {
                        return rs.getBytes("state_bytes");
                    }
                    return null;
                }
            } catch (SQLException e) {
                throw new IOException("Failed to load checkpoint metadata", e);
            }
        }

        @Override
        public void dispose() {}
    }
}

3. Cấu hình môi trường Flink trong Spring Boot

Sử dụng EmbeddedRocksDBStateBackend cho trạng thái nội bộ, còn PostgreSQL chỉ giữ metadata và offset — đảm bảo hiệu năng cao và tính nhất quán:
@Configuration
public class FlinkEnvironmentConfig {

    @Bean
    public StreamExecutionEnvironment streamingEnv(
            @Value("${flink.checkpoint.interval:30000}") long intervalMs,
            DataSource pgDataSource) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Bật checkpoint với khoảng cách cố định
        env.enableCheckpointing(intervalMs, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
        env.getCheckpointConfig().enableUnalignedCheckpoints(true);

        // Dùng RocksDB cho state backend — tối ưu I/O
        env.setStateBackend(new EmbeddedRocksDBStateBackend());

        // Gắn storage tùy chỉnh cho checkpoint metadata
        env.setCheckpointStorage(new PgCheckpointStorage(pgDataSource, "cdc-postgres-inventory"));

        return env;
    }
}

4. Khởi tạo nguồn CDC và khôi phục từ checkpoint gần nhất

Trước khi khởi chạy job, truy vấn PostgreSQL để lấy checkpoint mới nhất và thiết lập lại vị trí đọc từ LSN hoặc offset tương ứng:
public class InventoryCdcJobLauncher {

    private final StreamExecutionEnvironment env;
    private final JdbcTemplate jdbcTemplate;

    public InventoryCdcJobLauncher(StreamExecutionEnvironment env, JdbcTemplate jdbcTemplate) {
        this.env = env;
        this.jdbcTemplate = jdbcTemplate;
    }

    public void run() throws Exception {
        // Tìm checkpoint mới nhất cho job hiện tại
        Long latestSeq = jdbcTemplate.queryForObject(
            "SELECT MAX(checkpoint_seq) FROM flink_checkpoint_meta WHERE job_key = ?",
            Long.class,
            "cdc-postgres-inventory"
        );

        // Tùy chọn: truyền LSN từ checkpoint để khởi tạo source ở đúng vị trí
        String initialLsn = null;
        if (latestSeq != null) {
            initialLsn = jdbcTemplate.queryForObject(
                "SELECT lsn FROM flink_checkpoint_meta WHERE job_key = ? AND checkpoint_seq = ?",
                String.class,
                "cdc-postgres-inventory", latestSeq
            );
        }

        PostgresSource<String> source = PostgresSource.<String>builder()
            .hostname("pg-host")
            .port(5432)
            .database("inventory_db")
            .username("cdc_user")
            .password("secret")
            .schemaList("inventory")
            .tableList("inventory.customers", "inventory.orders")
            .deserializer(new JsonDebeziumDeserializationSchema())
            .slotName("flink_cdc_slot")
            .publicationName("flink_cdc_pub")
            .debeziumProperties(Map.of(
                "plugin.name", "pgoutput",
                "snapshot.mode", "exported"
            ))
            .build();

        DataStreamSource<String> stream = env.fromSource(
            source,
            WatermarkStrategy.noWatermarks(),
            "PostgreSQL CDC Source"
        );

        stream.print("CDC-Event");

        env.execute("Inventory Sync Job with PG-backed Checkpoint");
    }
}

5. Xử lý các vấn đề thực tiễn

  • Nén dữ liệu trạng thái: Trước khi lưu vào state_bytes, áp dụng DeflaterOutputStream để giảm kích thước — đặc biệt hữu ích với cấu trúc offset phức tạp.
  • Dọn dẹp tự động: Sử dụng ScheduledTask trong Spring để xóa các bản ghi cũ hơn N checkpoint:
@Scheduled(fixedDelay = 3600000) // mỗi giờ
public void cleanupOldCheckpoints() {
    String sql = "DELETE FROM flink_checkpoint_meta WHERE (job_key, checkpoint_seq) IN (" +
                 "  SELECT job_key, checkpoint_seq FROM (" +
                 "    SELECT job_key, checkpoint_seq, ROW_NUMBER() OVER (PARTITION BY job_key ORDER BY checkpoint_seq DESC) AS rn" +
                 "    FROM flink_checkpoint_meta" +
                 "  ) ranked WHERE rn > 7" +
                 ")";
    jdbcTemplate.update(sql);
}
  • Khả năng chịu lỗi: Kết nối tới PostgreSQL nên được cấu hình với failover URL (ví dụ: jdbc:postgresql://pg1:5432,pg2:5432/db?targetServerType=preferSecondary) hoặc tích hợp với PgBouncer.

Thẻ: flink-cdc PostgreSQL checkpoint rocksdb spring-boot

Đăng vào ngày 24 tháng 6 lúc 20:30