Nhập các thư viện cần thiết
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.16.0</flink.version>
<flink-cdc.version>3.0.1</flink-cdc.version>
</properties>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-postgres-cdc</artifactId>
<version>${flink-cdc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb</artifactId>
<version>${flink.version}</version>
</dependency>
Cấu hình cơ sở dữ liệu
1. Cấu hình cơ sở dữ liệu
wal_level = logical
max_replication_slots = 20
max_wal_senders = 20
wal_sender_timeout = 180s
shared_preload_libraries = 'pgoutput'
2. Cấu hình quyền truy cập cơ sở dữ liệu
CREATE USER cdc_user WITH PASSWORD 'cdc@dip001';
ALTER ROLE cdc_user REPLICATION;
GRANT CONNECT ON DATABASE dip_basedata TO cdc_user;
GRANT USAGE ON SCHEMA public TO cdc_user;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO cdc_user;
CREATE PUBLICATION tasktable_cdc_pub FOR TABLE public.t_common_sync_task;
ALTER TABLE public.t_common_sync_task REPLICA IDENTITY FULL;
Viết mã nguồn
1. Tạo môi trường thực thi Flink
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30000);
env.setStateBackend(new RocksDBStateBackend("file:///tmp/flink/checkpoints", true));
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
env.getCheckpointConfig().setCheckpointTimeout(600000);
env.setParallelism(1);
2. Cấu hình thuộc tính Debezium
Properties properties = new Properties();
properties.setProperty("slot.name", "dbz_customerdb_europe_slot");
properties.setProperty("publication.name", "tasktable_cdc_pub");
properties.setProperty("publication.create.enable", "true");
properties.setProperty("debezium.slot.drop.on.top", "false");
properties.setProperty("include.schema.changes", "false");
if (initReadIgnore) {
properties.setProperty("debezium.snapshot.mode", "never");
properties.setProperty("snapshot.mode", "never");
} else {
properties.setProperty("debezium.snapshot.mode", "initial");
properties.setProperty("snapshot.mode", "initial");
}
3. Tạo nguồn CDC
DebeziumSourceFunction<String> source = PostgreSQLSource.<String>builder()
.hostname("10.17.16.204")
.port(5432)
.username("cdc_user")
.password("cdc@dip001")
.database("dip_basedata")
.schemaList("public")
.tableList("public.t_common_sync_task")
.debeziumProperties(properties)
.deserializer(new JsonDebeziumDeserializationSchema())
.decodingPluginName("pgoutput")
.build();
4. Khởi chạy tác vụ
DataStreamSource<String> sourceStream = env.addSource(source, "PG_SOURCE").setParallelism(1);
SinkFunction<String> sink = new SinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
log.info("Dữ liệu được lắng nghe: {}", JSONUtil.toJsonStr(value));
SinkFunction.super.invoke(value, context);
}
};
sourceStream.addSink(sink).name("PG_SINK");
env.execute("PG_JOB");
Kiểm tra kết quả
Starting PostgresConnectorTask with configuration:
connector.class = io.debezium.connector.postgresql.PostgresConnector
slot.name = dbz_customerdb_europe_slot
publication.name = tasktable_cdc_pub
schema.include.list = public
include.schema.changes = false
debezium.slot.drop.on.top = true
tombstones.on.delete = false
offset.storage.file.filename =
publication.create.enable = true
value.converter = com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverter
database.history.instance.name = b95c8e68-db7b-4b1a-ad9f-7472bb4d04f9
key.converter = com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverter
database.user = cdc_user
database.dbname = dip_basedata
offset.storage = com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore
database.server.name = postgres_cdc_source
offset.flush.timeout.ms = 5000
heartbeat.interval.ms = 300000
database.port = 5432
plugin.name = pgoutput
offset.flush.interval.ms = 9223372036854775807
internal.key.converter = com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverter
debezium.snapshot.mode = never
database.hostname = 108.19.16.205
database.password = ********
name = engine
internal.value.converter = com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverter
table.include.list = public.t_common_sync_task
snapshot.mode = never
database.history = com.ververica.cdc.debezium.internal.FlinkDatabaseSchemaHistory