Quản lý offset trong Debezium Embedded CDC

Việc quản lý offset là yếu tố then chốt đảm bảo tính nhất quán và độ tin cậy khi sử dụng Debezium Embedded để bắt thay đổi dữ liệu. Dưới đây là phân tích chi tiết về cơ chế lưu trữ, xử lý tùy chỉnh, phục hồi lỗi và chiến lược reset, kèm theo ví dụ mã nguồn và tình huống thực tế.

Cơ chế lưu trữ offset

1. Các backend lưu trữ sẵn có

(a) Lưu vào file (FileOffsetBackingStore)

  • Phù hợp: Ứng dụng đơn máy hoặc nhẹ.
  • Cấu hình:
Configuration setup = Configuration.create()
    .with(EmbeddedEngine.OFFSET_STORAGE, FileOffsetBackingStore.class.getName())
    .with("offset.storage.file.filename", "/data/offsets.dat")
    .build();
  • Ưu điểm: Dữ liệu tồn tại sau restart; cần NFS nếu chạy nhiều instance.

(b) Lưu vào Kafka (KafkaOffsetBackingStore)

  • Phù hợp: Hệ thống yêu cầu HA hoặc đã dùng Kafka.
  • Cấu hình:
Configuration setup = Configuration.create()
    .with(EmbeddedEngine.OFFSET_STORAGE, KafkaOffsetBackingStore.class.getName())
    .with("offset.storage.topic", "cdc-offsets")
    .with("offset.storage.replication.factor", 3)
    .build();
  • Ưu điểm: Tận dụng cơ chế replication của Kafka; hỗ trợ coordination giữa các consumer.

(c) Lưu trong bộ nhớ (mặc định)

  • Phù hợp: Môi trường test hoặc không cần bền vững.
  • Lưu ý: Offset mất khi restart engine.

2. Tự xây dựng backend lưu trữ

Triển khai giao diện OffsetBackingStore để lưu offset vào hệ thống riêng (ví dụ: PostgreSQL):

public class PostgreOffsetStore implements OffsetBackingStore {
    private Connection conn;

    @Override
    public void start() {
        conn = DriverManager.getConnection("jdbc:postgresql://localhost/cdc");
    }

    @Override
    public void stop() {
        if (conn != null) conn.close();
    }

    @Override
    public Map<String, Object> read() {
        try (PreparedStatement stmt = conn.prepareStatement(
            "SELECT data FROM offset_store WHERE task_id = ?")) {
            stmt.setString(1, taskId);
            ResultSet rs = stmt.executeQuery();
            if (rs.next()) {
                return deserialize(rs.getString("data"));
            }
        }
        return Collections.emptyMap();
    }

    @Override
    public void write(Map<String, Object> offsets) {
        String serialized = serialize(offsets);
        try (PreparedStatement stmt = conn.prepareStatement(
            "INSERT INTO offset_store VALUES (?, ?) ON CONFLICT (task_id) DO UPDATE SET data = ?")) {
            stmt.setString(1, taskId);
            stmt.setString(2, serialized);
            stmt.setString(3, serialized);
            stmt.executeUpdate();
        }
    }
}

Xử lý offset tùy chỉnh

1. Kiểm soát thời điểm commit

Tạo policy chỉ commit khi xử lý thành công:

public class ConditionalCommitPolicy implements OffsetCommitPolicy {
    private volatile boolean shouldCommit = false;

    public void markSuccess() { this.shouldCommit = true; }
    public void markFailure() { this.shouldCommit = false; }

    @Override
    public boolean shouldCommit(Map<String, Object> current, Map<String, Object> last) {
        boolean result = shouldCommit;
        shouldCommit = false; // reset sau mỗi lần kiểm tra
        return result;
    }
}

2. Theo dõi sự kiện commit

Gắn listener để log hoặc cảnh báo:

engine.setOffsetCommitListener((offsets, success) -> {
    if (success) {
        System.out.println("[OK] Đã lưu offset: " + offsets);
    } else {
        System.err.println("[LỖI] Không thể lưu offset: " + offsets);
    }
});

Phục hồi sau sự cố

1. Quy trình tự động

  1. Khởi động → đọc offset từ storage.
  2. Dùng offset để xác định vị trí trong transaction log (binlog, WAL...).
  3. Lọc bản ghi trùng nhờ ID giao dịch hoặc khóa duy nhất.

2. Xử lý thủ công

(a) File offset bị hỏng

// Reset về vị trí cụ thể
Map<String, Object> recoveryPoint = new HashMap<>();
recoveryPoint.put("file", "mysql-bin.000123");
recoveryPoint.put("position", 789);
engine.setManualOffset(recoveryPoint);

(b) Transaction log bị xoá

engine.setErrorHandler((error, msg) -> {
    if (error instanceof LogPositionLostException) {
        engine.setManualOffset(Map.of("reset", "earliest"));
    }
});

Chiến lược reset offset

1. Reset trong lúc chạy

// Về đầu stream
engine.setManualOffset(Map.of("reset", "earliest"));

// Bỏ qua dữ liệu cũ
engine.setManualOffset(Map.of("reset", "latest"));

// Về thời điểm cụ thể
Map<String, Object> timeBased = new HashMap<>();
timeBased.put("timestamp", Instant.parse("2023-06-01T00:00:00Z").toEpochMilli());
engine.setManualOffset(timeBased);

2. Reset qua cấu hình khởi động

Configuration setup = Configuration.create()
    .with("offset.reset.strategy", "earliest")  // hoặc "latest"
    .build();

3. Kết hợp với snapshot

// Bắt đầu từ bản ghi có ID=500 trong bảng orders
Map<String, Object> snapshotStart = Map.of(
    "snapshot.starting.position", "orders:500"
);
engine.setManualOffset(snapshotStart);

Thực hành tốt nhất

  • Production: Dùng Kafka làm storage, replication factor ≥ 3.
  • Giám sát: Theo dõi metric lag qua JMX; cảnh báo khi commit thất bại.
  • Tách luồng xử lý: Dùng thread pool riêng cho việc commit offset:
ExecutorService commitPool = Executors.newFixedThreadPool(1);
engine.setOffsetCommitExecutor(commitPool);
  • Kiểm thử: Mô phỏng crash, failover DB để xác minh khả năng phục hồi.
Tình huốngGiải pháp đề xuất
Ứng dụng đơn giảnFile storage + backup định kỳ
Hệ thống phân tánKafka storage + replication
Cần kiểm soát chính xácCustom store + database
Phục hồi lịch sửSnapshot + timestamp reset

Triển khai commit offset sau khi xử lý thành công

Để đảm bảo tính toàn vẹn dữ liệu, ta cần commit offset chỉ khi event đã được xử lý thành công ở hệ thống đích.

Bước 1: Tạo policy commit điều kiện

public class SuccessDrivenPolicy implements OffsetCommitPolicy {
    private final AtomicBoolean successFlag = new AtomicBoolean(false);

    public void onSuccess() { successFlag.set(true); }
    public void onFailure() { successFlag.set(false); }

    @Override
    public boolean shouldCommit(Map<String, Object> curr, Map<String, Object> prev) {
        return successFlag.getAndSet(false);
    }
}

Bước 2: Gắn vào engine và xử lý event

Properties config = new Properties();
config.setProperty("offset.commit.policy", SuccessDrivenPolicy.class.getName());
// ... các cấu hình khác

SuccessDrivenPolicy commitPolicy = new SuccessDrivenPolicy();

DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
    .using(config)
    .notifying(event -> {
        try {
            forwardToDownstream(event);  // Gửi đến Kafka, Elasticsearch...
            commitPolicy.onSuccess();    // Đánh dấu thành công
        } catch (Exception ex) {
            commitPolicy.onFailure();    // Đánh dấu thất bại
            throw ex;                    // Engine sẽ retry
        }
    })
    .build();

Bước 3: Xác minh hành vi

  • Nếu forwardToDownstream() thành công → offset được commit.
  • Nếu xảy ra exception → offset giữ nguyên → event được xử lý lại sau restart.

Tối ưu nâng cao

  • Batch commit: Commit sau mỗi N event thành công để giảm I/O.
  • Dead-letter queue: Ghi event lỗi vào topic riêng để xử lý sau.
  • Metrics: Đo lường throughput và latency qua Micrometer hoặc Prometheus.

Thẻ: Debezium CDC kafka embedded-engine offset-management

Đăng vào ngày 16 tháng 6 lúc 19:45