Hướng dẫn chi tiết cài đặt và cấu hình StreamPark cùng Flink CDC
1. Cài đặt StreamPark
1. Chuẩn bị môi trường
- Hệ điều hành: Linux (đề xuất CentOS 7+/Ubuntu 20.04+), macOS hoặc Windows (cần Docker).
- Môi trường Java: JDK 11 hoặc 17 (
openjdk-17-jdk), kiểm tra bằng lệnh:
java -version
- Docker và Docker Compose (đề xuất sử dụng Docker):
- Docker phiên bản ≥ 1.13.1, Docker Compose ≥ 1.28.0.
- Cài đặt Docker Compose theo cách nhị phân:
sudo curl -L "https://get.daocloud.io/docker/compose/releases/download/2.16.0/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose
2. Triển khai StreamPark qua Docker
- Tải xuống tệp cấu hình:
Tạo các tệp
docker-compose.yamlvà.envvới nội dung sau:
docker-compose.yaml
version: '3.8'
services:
streampark-console:
image: apache/streampark:latest
command: ${RUN_COMMAND}
ports:
- "10000:10000"
env_file: .env
volumes:
- flink:/streampark/flink/${FLINK}
- /var/run/docker.sock:/var/run/docker.sock
- ~/.kube:/root/.kube:ro
privileged: true
restart: unless-stopped
flink-jobmanager:
image: ${FLINK_IMAGE}
ports:
- "8081:8081"
command: jobmanager
volumes:
- flink:/opt/flink
env_file: .env
restart: unless-stopped
privileged: true
flink-taskmanager:
image: ${FLINK_IMAGE}
depends_on:
- flink-jobmanager
command: taskmanager
deploy:
replicas: 1
env_file: .env
restart: unless-stopped
privileged: true
networks:
streampark:
driver: bridge
volumes:
flink:
.env
TZ=Asia/Ho_Chi_Minh
SPRING_PROFILES_ACTIVE=mysql
SPRING_DATASOURCE_URL=jdbc:mysql://127.0.0.1:3306/streampark?useSSL=false&serverTimezone=UTC
SPRING_DATASOURCE_USERNAME=root
SPRING_DATASOURCE_PASSWORD=mat_khau_cua_ban
FLINK=flink1.18.1
FLINK_IMAGE=flink:1.18.1-scala_2.12
RUN_COMMAND='/bin/sh -c "wget -P lib https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.31/mysql-connector-j-8.0.31.jar && bash bin/start_docker"'
- Khởi tạo cơ sở dữ liệu MySQL: Tải xuống script SQL của StreamPark và chạy:
wget https://github.com/apache/incubator-streampark/archive/refs/tags/v1.0.0.tar.gz
tar -xzvf v1.0.0.tar.gz
cd incubator-streampark-1.0.0/script/schema
mysql -u root -p < mysql-schema.sql
mysql -u root -p < mysql-data.sql
- Khởi động StreamPark:
docker-compose up -d
Truy cập giao diện Web tại http://localhost:10000, tài khoản mặc định là admin/streampark.
2. Cấu hình Flink CDC
1. Tải xuống connector Flink CDC
- Chọn connector tương thích với phiên bản Flink (ví dụ Flink 1.18.x dùng
flink-sql-connector-mysql-cdc-3.6.0.jar). - Tải từ kho chính thức của Flink CDC và di chuyển JAR vào thư mục
libcủa StreamPark.
2. Cấu hình bảng nguồn MySQL
- Đảm bảo MySQL đã bật tính năng Binlog (để bắt sự thay đổi trong dữ liệu):
-- Sửa file cấu hình my.cnf (hoặc my.ini)
[mysqld]
server-id=1
log_bin=mysql-bin
binlog_format=ROW
binlog_row_image=FULL
Khởi động lại dịch vụ MySQL và kiểm tra:
SHOW VARIABLES LIKE 'log_bin';
3. Đồng bộ dữ liệu thời gian thực từ MySQL sang Doris
1. Tạo công việc Flink SQL
- Ví dụ mã SQL:
-- Bảng nguồn (MySQL CDC)
CREATE TABLE nguon_mysql (
ma_so INT,
ten NVARCHAR(255),
PRIMARY KEY (ma_so) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'mat_khau_cua_ban',
'database-name' = 'db_test',
'table-name' = 'hoc_sinh'
);
-- Bảng đích (Doris)
CREATE TABLE dich_doris (
ma_so INT,
ten NVARCHAR(255)
) WITH (
'connector' = 'doris',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'db_test.hoc_sinh',
'username' = 'root',
'password' = ''
);
-- Thực hiện đồng bộ
INSERT INTO dich_doris SELECT * FROM nguon_mysql;
2. Triển khai công việc trên StreamPark
- Tải lên JAR: Di chuyển các gói JAR của Flink CDC và Doris vào thư mục
lib. - Tạo công việc:
- Đăng nhập vào bảng điều khiển StreamPark, chọn mô-đun Flink SQL.
- Dán đoạn mã SQL trên và cấu hình thông số công việc (như độ song song, khoảng cách Checkpoint).
- Chọn chế độ triển khai (ví dụ
Kubernetes ApplicationhoặcYARN Per-Job). - Nhấn nút Triển khai để khởi chạy.
3. Giám sát và vận hành
- Trạng thái công việc: Theo dõi trạng thái, nhật ký và chỉ báo trên bảng điều khiển StreamPark.
- Phục hồi lỗi: Sử dụng Savepoint để đảm bảo tính nhất quán dữ liệu.
- Tối ưu hiệu suất:
- Điều chỉnh độ song song (ví dụ
parallelism=4). - Kích hoạt Checkpoint:
'execution.checkpointing.interval' = '30s'.
4. Lưu ý quan trọng
- Tương thích phụ thuộc
- Đảm bảo phiên bản connector Flink CDC phù hợp với Flink (ví dụ Flink 1.18.x dùng
flink-cdc-3.6.0). - Nếu đích là Hive/Hologres, cần cấu hình thêm JDBC driver tương ứng.
- Nhất quán dữ liệu
- Cấu hình
schema.change.behavior=LENIENTtrong YAML hoặc SQL để tự động điều chỉnh cấu trúc bảng. - Đảm bảo khóa chính giữa bảng nguồn và đích khớp nhau để tránh mất hoặc trùng lặp dữ liệu.
- Đề xuất cho môi trường sản xuất
- Triển khai có sẵn cao: Sử dụng Kubernetes hoặc YARN cluster mode để tránh điểm hỏng duy nhất.
- Giám sát cảnh báo: Tích hợp Prometheus + Grafana để giám sát độ trễ, thông lượng và các chỉ báo khác.