Thiết lập và triển khai Flink CDC với PostgreSQL

Nhập các thư viện cần thiết


    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.16.0</flink.version>
        <flink-cdc.version>3.0.1</flink-cdc.version>
    </properties>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-loader</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-runtime</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-base</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-sql-connector-postgres-cdc</artifactId>
        <version>${flink-cdc.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb</artifactId>
        <version>${flink.version}</version>
    </dependency>

Cấu hình cơ sở dữ liệu

1. Cấu hình cơ sở dữ liệu


wal_level = logical
max_replication_slots = 20
max_wal_senders = 20
wal_sender_timeout = 180s
shared_preload_libraries = 'pgoutput'

2. Cấu hình quyền truy cập cơ sở dữ liệu


CREATE USER cdc_user WITH PASSWORD 'cdc@dip001';
ALTER ROLE cdc_user REPLICATION;
GRANT CONNECT ON DATABASE dip_basedata TO cdc_user;
GRANT USAGE ON SCHEMA public TO cdc_user;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO cdc_user;
CREATE PUBLICATION tasktable_cdc_pub FOR TABLE public.t_common_sync_task;
ALTER TABLE public.t_common_sync_task REPLICA IDENTITY FULL;

Viết mã nguồn

1. Tạo môi trường thực thi Flink


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30000);
env.setStateBackend(new RocksDBStateBackend("file:///tmp/flink/checkpoints", true));
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
env.getCheckpointConfig().setCheckpointTimeout(600000);
env.setParallelism(1);

2. Cấu hình thuộc tính Debezium


Properties properties = new Properties();
properties.setProperty("slot.name", "dbz_customerdb_europe_slot");
properties.setProperty("publication.name", "tasktable_cdc_pub");
properties.setProperty("publication.create.enable", "true");
properties.setProperty("debezium.slot.drop.on.top", "false");
properties.setProperty("include.schema.changes", "false");

if (initReadIgnore) {
    properties.setProperty("debezium.snapshot.mode", "never");
    properties.setProperty("snapshot.mode", "never");
} else {
    properties.setProperty("debezium.snapshot.mode", "initial");
    properties.setProperty("snapshot.mode", "initial");
}

3. Tạo nguồn CDC


DebeziumSourceFunction<String> source = PostgreSQLSource.<String>builder()
    .hostname("10.17.16.204")
    .port(5432)
    .username("cdc_user")
    .password("cdc@dip001")
    .database("dip_basedata")
    .schemaList("public")
    .tableList("public.t_common_sync_task")
    .debeziumProperties(properties)
    .deserializer(new JsonDebeziumDeserializationSchema())
    .decodingPluginName("pgoutput")
    .build();

4. Khởi chạy tác vụ


DataStreamSource<String> sourceStream = env.addSource(source, "PG_SOURCE").setParallelism(1);
SinkFunction<String> sink = new SinkFunction<String>() {
    @Override
    public void invoke(String value, Context context) throws Exception {
        log.info("Dữ liệu được lắng nghe: {}", JSONUtil.toJsonStr(value));
        SinkFunction.super.invoke(value, context);
    }
};
sourceStream.addSink(sink).name("PG_SINK");
env.execute("PG_JOB");

Kiểm tra kết quả


Starting PostgresConnectorTask with configuration:
connector.class = io.debezium.connector.postgresql.PostgresConnector
slot.name = dbz_customerdb_europe_slot
publication.name = tasktable_cdc_pub
schema.include.list = public
include.schema.changes = false
debezium.slot.drop.on.top = true
tombstones.on.delete = false
offset.storage.file.filename =
publication.create.enable = true
value.converter = com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverter
database.history.instance.name = b95c8e68-db7b-4b1a-ad9f-7472bb4d04f9
key.converter = com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverter
database.user = cdc_user
database.dbname = dip_basedata
offset.storage = com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore
database.server.name = postgres_cdc_source
offset.flush.timeout.ms = 5000
heartbeat.interval.ms = 300000
database.port = 5432
plugin.name = pgoutput
offset.flush.interval.ms = 9223372036854775807
internal.key.converter = com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverter
debezium.snapshot.mode = never
database.hostname = 108.19.16.205
database.password = ********
name = engine
internal.value.converter = com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverter
table.include.list = public.t_common_sync_task
snapshot.mode = never
database.history = com.ververica.cdc.debezium.internal.FlinkDatabaseSchemaHistory

Thẻ: Flink CDC PostgreSQL Debezium rocksdb

Đăng vào ngày 20 tháng 6 lúc 06:47