Sử dụng StreamPark và Flink CDC để đồng bộ dữ liệu thời gian thực

Hướng dẫn chi tiết cài đặt và cấu hình StreamPark cùng Flink CDC

1. Cài đặt StreamPark

1. Chuẩn bị môi trường

  • Hệ điều hành: Linux (đề xuất CentOS 7+/Ubuntu 20.04+), macOS hoặc Windows (cần Docker).
  • Môi trường Java: JDK 11 hoặc 17 (openjdk-17-jdk), kiểm tra bằng lệnh:
java -version
  • Docker và Docker Compose (đề xuất sử dụng Docker):
  • Docker phiên bản ≥ 1.13.1, Docker Compose ≥ 1.28.0.
  • Cài đặt Docker Compose theo cách nhị phân:
sudo curl -L "https://get.daocloud.io/docker/compose/releases/download/2.16.0/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose

2. Triển khai StreamPark qua Docker

  • Tải xuống tệp cấu hình: Tạo các tệp docker-compose.yaml.env với nội dung sau:

docker-compose.yaml

version: '3.8'
services:
  streampark-console:
    image: apache/streampark:latest
    command: ${RUN_COMMAND}
    ports:
      - "10000:10000"
    env_file: .env
    volumes:
      - flink:/streampark/flink/${FLINK}
      - /var/run/docker.sock:/var/run/docker.sock
      - ~/.kube:/root/.kube:ro
    privileged: true
    restart: unless-stopped
  flink-jobmanager:
    image: ${FLINK_IMAGE}
    ports:
      - "8081:8081"
    command: jobmanager
    volumes:
      - flink:/opt/flink
    env_file: .env
    restart: unless-stopped
    privileged: true
  flink-taskmanager:
    image: ${FLINK_IMAGE}
    depends_on:
      - flink-jobmanager
    command: taskmanager
    deploy:
      replicas: 1
    env_file: .env
    restart: unless-stopped
    privileged: true
networks:
  streampark:
    driver: bridge
volumes:
  flink:

.env

TZ=Asia/Ho_Chi_Minh
SPRING_PROFILES_ACTIVE=mysql
SPRING_DATASOURCE_URL=jdbc:mysql://127.0.0.1:3306/streampark?useSSL=false&serverTimezone=UTC
SPRING_DATASOURCE_USERNAME=root
SPRING_DATASOURCE_PASSWORD=mat_khau_cua_ban
FLINK=flink1.18.1
FLINK_IMAGE=flink:1.18.1-scala_2.12
RUN_COMMAND='/bin/sh -c "wget -P lib https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.31/mysql-connector-j-8.0.31.jar && bash bin/start_docker"'
  • Khởi tạo cơ sở dữ liệu MySQL: Tải xuống script SQL của StreamPark và chạy:
wget https://github.com/apache/incubator-streampark/archive/refs/tags/v1.0.0.tar.gz
tar -xzvf v1.0.0.tar.gz
cd incubator-streampark-1.0.0/script/schema
mysql -u root -p < mysql-schema.sql
mysql -u root -p < mysql-data.sql
  • Khởi động StreamPark:
docker-compose up -d

Truy cập giao diện Web tại http://localhost:10000, tài khoản mặc định là admin/streampark.

2. Cấu hình Flink CDC

1. Tải xuống connector Flink CDC

  • Chọn connector tương thích với phiên bản Flink (ví dụ Flink 1.18.x dùng flink-sql-connector-mysql-cdc-3.6.0.jar).
  • Tải từ kho chính thức của Flink CDC và di chuyển JAR vào thư mục lib của StreamPark.

2. Cấu hình bảng nguồn MySQL

  • Đảm bảo MySQL đã bật tính năng Binlog (để bắt sự thay đổi trong dữ liệu):
-- Sửa file cấu hình my.cnf (hoặc my.ini)
[mysqld]
server-id=1
log_bin=mysql-bin
binlog_format=ROW
binlog_row_image=FULL

Khởi động lại dịch vụ MySQL và kiểm tra:

SHOW VARIABLES LIKE 'log_bin';

3. Đồng bộ dữ liệu thời gian thực từ MySQL sang Doris

1. Tạo công việc Flink SQL

  • Ví dụ mã SQL:
-- Bảng nguồn (MySQL CDC)
CREATE TABLE nguon_mysql (
  ma_so INT,
  ten NVARCHAR(255),
  PRIMARY KEY (ma_so) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = 'mat_khau_cua_ban',
  'database-name' = 'db_test',
  'table-name' = 'hoc_sinh'
);

-- Bảng đích (Doris)
CREATE TABLE dich_doris (
  ma_so INT,
  ten NVARCHAR(255)
) WITH (
  'connector' = 'doris',
  'fenodes' = '127.0.0.1:8030',
  'table.identifier' = 'db_test.hoc_sinh',
  'username' = 'root',
  'password' = ''
);

-- Thực hiện đồng bộ
INSERT INTO dich_doris SELECT * FROM nguon_mysql;

2. Triển khai công việc trên StreamPark

  • Tải lên JAR: Di chuyển các gói JAR của Flink CDC và Doris vào thư mục lib.
  • Tạo công việc:
  1. Đăng nhập vào bảng điều khiển StreamPark, chọn mô-đun Flink SQL.
  2. Dán đoạn mã SQL trên và cấu hình thông số công việc (như độ song song, khoảng cách Checkpoint).
  3. Chọn chế độ triển khai (ví dụ Kubernetes Application hoặc YARN Per-Job).
  4. Nhấn nút Triển khai để khởi chạy.

3. Giám sát và vận hành

  • Trạng thái công việc: Theo dõi trạng thái, nhật ký và chỉ báo trên bảng điều khiển StreamPark.
  • Phục hồi lỗi: Sử dụng Savepoint để đảm bảo tính nhất quán dữ liệu.
  • Tối ưu hiệu suất:
  • Điều chỉnh độ song song (ví dụ parallelism=4).
  • Kích hoạt Checkpoint: 'execution.checkpointing.interval' = '30s'.

4. Lưu ý quan trọng

  1. Tương thích phụ thuộc
  • Đảm bảo phiên bản connector Flink CDC phù hợp với Flink (ví dụ Flink 1.18.x dùng flink-cdc-3.6.0).
  • Nếu đích là Hive/Hologres, cần cấu hình thêm JDBC driver tương ứng.
  1. Nhất quán dữ liệu
  • Cấu hình schema.change.behavior=LENIENT trong YAML hoặc SQL để tự động điều chỉnh cấu trúc bảng.
  • Đảm bảo khóa chính giữa bảng nguồn và đích khớp nhau để tránh mất hoặc trùng lặp dữ liệu.
  1. Đề xuất cho môi trường sản xuất
  • Triển khai có sẵn cao: Sử dụng Kubernetes hoặc YARN cluster mode để tránh điểm hỏng duy nhất.
  • Giám sát cảnh báo: Tích hợp Prometheus + Grafana để giám sát độ trễ, thông lượng và các chỉ báo khác.

Thẻ: StreamPark FlinkCDC DockerCompose

Đăng vào ngày 7 tháng 6 lúc 06:44