Việc quản lý offset là yếu tố then chốt đảm bảo tính nhất quán và độ tin cậy khi sử dụng Debezium Embedded để bắt thay đổi dữ liệu. Dưới đây là phân tích chi tiết về cơ chế lưu trữ, xử lý tùy chỉnh, phục hồi lỗi và chiến lược reset, kèm theo ví dụ mã nguồn và tình huống thực tế.
Cơ chế lưu trữ offset
1. Các backend lưu trữ sẵn có
(a) Lưu vào file (FileOffsetBackingStore)
- Phù hợp: Ứng dụng đơn máy hoặc nhẹ.
- Cấu hình:
Configuration setup = Configuration.create()
.with(EmbeddedEngine.OFFSET_STORAGE, FileOffsetBackingStore.class.getName())
.with("offset.storage.file.filename", "/data/offsets.dat")
.build();
- Ưu điểm: Dữ liệu tồn tại sau restart; cần NFS nếu chạy nhiều instance.
(b) Lưu vào Kafka (KafkaOffsetBackingStore)
- Phù hợp: Hệ thống yêu cầu HA hoặc đã dùng Kafka.
- Cấu hình:
Configuration setup = Configuration.create()
.with(EmbeddedEngine.OFFSET_STORAGE, KafkaOffsetBackingStore.class.getName())
.with("offset.storage.topic", "cdc-offsets")
.with("offset.storage.replication.factor", 3)
.build();
- Ưu điểm: Tận dụng cơ chế replication của Kafka; hỗ trợ coordination giữa các consumer.
(c) Lưu trong bộ nhớ (mặc định)
- Phù hợp: Môi trường test hoặc không cần bền vững.
- Lưu ý: Offset mất khi restart engine.
2. Tự xây dựng backend lưu trữ
Triển khai giao diện OffsetBackingStore để lưu offset vào hệ thống riêng (ví dụ: PostgreSQL):
public class PostgreOffsetStore implements OffsetBackingStore {
private Connection conn;
@Override
public void start() {
conn = DriverManager.getConnection("jdbc:postgresql://localhost/cdc");
}
@Override
public void stop() {
if (conn != null) conn.close();
}
@Override
public Map<String, Object> read() {
try (PreparedStatement stmt = conn.prepareStatement(
"SELECT data FROM offset_store WHERE task_id = ?")) {
stmt.setString(1, taskId);
ResultSet rs = stmt.executeQuery();
if (rs.next()) {
return deserialize(rs.getString("data"));
}
}
return Collections.emptyMap();
}
@Override
public void write(Map<String, Object> offsets) {
String serialized = serialize(offsets);
try (PreparedStatement stmt = conn.prepareStatement(
"INSERT INTO offset_store VALUES (?, ?) ON CONFLICT (task_id) DO UPDATE SET data = ?")) {
stmt.setString(1, taskId);
stmt.setString(2, serialized);
stmt.setString(3, serialized);
stmt.executeUpdate();
}
}
}
Xử lý offset tùy chỉnh
1. Kiểm soát thời điểm commit
Tạo policy chỉ commit khi xử lý thành công:
public class ConditionalCommitPolicy implements OffsetCommitPolicy {
private volatile boolean shouldCommit = false;
public void markSuccess() { this.shouldCommit = true; }
public void markFailure() { this.shouldCommit = false; }
@Override
public boolean shouldCommit(Map<String, Object> current, Map<String, Object> last) {
boolean result = shouldCommit;
shouldCommit = false; // reset sau mỗi lần kiểm tra
return result;
}
}
2. Theo dõi sự kiện commit
Gắn listener để log hoặc cảnh báo:
engine.setOffsetCommitListener((offsets, success) -> {
if (success) {
System.out.println("[OK] Đã lưu offset: " + offsets);
} else {
System.err.println("[LỖI] Không thể lưu offset: " + offsets);
}
});
Phục hồi sau sự cố
1. Quy trình tự động
- Khởi động → đọc offset từ storage.
- Dùng offset để xác định vị trí trong transaction log (binlog, WAL...).
- Lọc bản ghi trùng nhờ ID giao dịch hoặc khóa duy nhất.
2. Xử lý thủ công
(a) File offset bị hỏng
// Reset về vị trí cụ thể
Map<String, Object> recoveryPoint = new HashMap<>();
recoveryPoint.put("file", "mysql-bin.000123");
recoveryPoint.put("position", 789);
engine.setManualOffset(recoveryPoint);
(b) Transaction log bị xoá
engine.setErrorHandler((error, msg) -> {
if (error instanceof LogPositionLostException) {
engine.setManualOffset(Map.of("reset", "earliest"));
}
});
Chiến lược reset offset
1. Reset trong lúc chạy
// Về đầu stream
engine.setManualOffset(Map.of("reset", "earliest"));
// Bỏ qua dữ liệu cũ
engine.setManualOffset(Map.of("reset", "latest"));
// Về thời điểm cụ thể
Map<String, Object> timeBased = new HashMap<>();
timeBased.put("timestamp", Instant.parse("2023-06-01T00:00:00Z").toEpochMilli());
engine.setManualOffset(timeBased);
2. Reset qua cấu hình khởi động
Configuration setup = Configuration.create()
.with("offset.reset.strategy", "earliest") // hoặc "latest"
.build();
3. Kết hợp với snapshot
// Bắt đầu từ bản ghi có ID=500 trong bảng orders
Map<String, Object> snapshotStart = Map.of(
"snapshot.starting.position", "orders:500"
);
engine.setManualOffset(snapshotStart);
Thực hành tốt nhất
- Production: Dùng Kafka làm storage, replication factor ≥ 3.
- Giám sát: Theo dõi metric
lagqua JMX; cảnh báo khi commit thất bại. - Tách luồng xử lý: Dùng thread pool riêng cho việc commit offset:
ExecutorService commitPool = Executors.newFixedThreadPool(1);
engine.setOffsetCommitExecutor(commitPool);
- Kiểm thử: Mô phỏng crash, failover DB để xác minh khả năng phục hồi.
| Tình huống | Giải pháp đề xuất |
|---|---|
| Ứng dụng đơn giản | File storage + backup định kỳ |
| Hệ thống phân tán | Kafka storage + replication |
| Cần kiểm soát chính xác | Custom store + database |
| Phục hồi lịch sử | Snapshot + timestamp reset |
Triển khai commit offset sau khi xử lý thành công
Để đảm bảo tính toàn vẹn dữ liệu, ta cần commit offset chỉ khi event đã được xử lý thành công ở hệ thống đích.
Bước 1: Tạo policy commit điều kiện
public class SuccessDrivenPolicy implements OffsetCommitPolicy {
private final AtomicBoolean successFlag = new AtomicBoolean(false);
public void onSuccess() { successFlag.set(true); }
public void onFailure() { successFlag.set(false); }
@Override
public boolean shouldCommit(Map<String, Object> curr, Map<String, Object> prev) {
return successFlag.getAndSet(false);
}
}
Bước 2: Gắn vào engine và xử lý event
Properties config = new Properties();
config.setProperty("offset.commit.policy", SuccessDrivenPolicy.class.getName());
// ... các cấu hình khác
SuccessDrivenPolicy commitPolicy = new SuccessDrivenPolicy();
DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
.using(config)
.notifying(event -> {
try {
forwardToDownstream(event); // Gửi đến Kafka, Elasticsearch...
commitPolicy.onSuccess(); // Đánh dấu thành công
} catch (Exception ex) {
commitPolicy.onFailure(); // Đánh dấu thất bại
throw ex; // Engine sẽ retry
}
})
.build();
Bước 3: Xác minh hành vi
- Nếu
forwardToDownstream()thành công → offset được commit. - Nếu xảy ra exception → offset giữ nguyên → event được xử lý lại sau restart.
Tối ưu nâng cao
- Batch commit: Commit sau mỗi N event thành công để giảm I/O.
- Dead-letter queue: Ghi event lỗi vào topic riêng để xử lý sau.
- Metrics: Đo lường throughput và latency qua Micrometer hoặc Prometheus.