Nguyên nhân gây ra vấn đề nhất quán dữ liệu
Các sự cố phổ biến trong hệ thống hàng đợi tin nhắn bao gồm:
- Lỗi nhà sản xuất: Tin nhắn được nhận bởi broker nhưng chưa ghi đĩa trước khi mất điện
- Lỗi người tiêu dùng: Xử lý nghiệp vụ thất bại sau khi tiêu thụ tin nhắn thành công
- Phân phối trùng lặp: Do bất ổn mạng gây ra gửi tin nhắn nhiều lần
- Bất đồng bộ trạng thái: Dữ liệu ứng dụng và trạng thái tin nhắn không khớp
Giải pháp ngăn chặn mất tin nhắn
Giao dịch tin nhắn hai giai đoạn
Ví dụ triển khai trong RocketMQ:
TransactionProducer txProducer = new TransactionProducer("tx_group");
txProducer.setHandler(new TransactionHandler() {
public TransactionStatus execute(Message msg, Object ctx) {
return performBusiness() ? COMMITTED : ROLLED_BACK;
}
public TransactionStatus verify(Message msg) {
return verifyTransaction(msg.getId()) ? COMMITTED : ROLLED_BACK;
}
});
Cấu hình lưu trữ bền vững
Thiết lập quan trọng cho RabbitMQ:
| Cấu hình | Ví dụ | Tác dụng |
|---|---|---|
| Durable queue | durable=true | Bảo toàn metadata hàng đợi |
| Persistent message | deliveryMode=2 | Lưu tin nhắn lên đĩa |
| Lazy Queue | x-queue-mode=lazy | Ghi trực tiếp vào ổ đĩa |
| Xác nhận nhà sản xuất | publisher-confirms | Xác nhận gửi tin thành công |
Thiết lập bản sao dự phòng
| Công nghệ | Cấu hình khuyến nghị |
|---|---|
| Kafka | acks=all, min.insync.replicas=2 |
| RocketMQ | SYNC_FLUSH, master-slave đồng bộ |
| Pulsar | BookKeeper đa bản sao |
Giải pháp xử lý tiêu thụ trùng lặp
Định danh duy nhất
UniqueIDGenerator generator = new UniqueIDGenerator();
String messageId = "TXN_" + generator.nextID();
if (cacheClient.setIfAbsent(messageId, "LOCKED")) {
cacheClient.expire(messageId, 48 * 3600);
executeBusinessLogic();
}
Thiết kế xử lý lặp an toàn
| Kịch bản | Giải pháp | Cơ chế |
|---|---|---|
| Nhất quán mạnh | SELECT ... FOR UPDATE | Khóa hàng cơ sở dữ liệu |
| Nhất quán cuối cùng | Kiểm soát phiên bản | Thử lại có kiểm soát |
| Bồi hoàn | Thao tác đảo ngược | Ghi nhật ký bắt buộc |
Hàng đợi tin nhắn lỗi
channel.basicConsume(queueName, false, callback);
void process(Message msg) {
try {
processBusiness();
channel.ack(msg);
} catch (Exception e) {
if (retryCount++ < MAX_RETRY) {
channel.nack(msg, true);
} else {
channel.nack(msg, false);
}
}
}
Kinh nghiệm triển khai thực tế
- Sử dụng ID nghiệp vụ thay vì ID hệ thống
- Triển khai logic idempotent cho mọi xử lý tin nhắn
- Đồng bộ giao dịch cơ sở dữ liệu và hàng đợi
- Giới hạn luồng tiêu thụ theo phân vùng
- Giám sát hàng đợi lỗi chặt chẽ
- Kiểm thử điều kiện mạng bất ổn
- Hỗ trợ phiên bản cho cấu trúc tin nhắn
- Không dùng hàng đợi làm luồng chính
- Lưu vị trí offset định kỳ
- Liên kết chỉ số nghiệp vụ và giám sát hàng đợi