Bảo đảm Tính nhất quán Dữ liệu trong Hàng đợi Tin nhắn

Nguyên nhân gây ra vấn đề nhất quán dữ liệu

Các sự cố phổ biến trong hệ thống hàng đợi tin nhắn bao gồm:

  1. Lỗi nhà sản xuất: Tin nhắn được nhận bởi broker nhưng chưa ghi đĩa trước khi mất điện
  2. Lỗi người tiêu dùng: Xử lý nghiệp vụ thất bại sau khi tiêu thụ tin nhắn thành công
  3. Phân phối trùng lặp: Do bất ổn mạng gây ra gửi tin nhắn nhiều lần
  4. Bất đồng bộ trạng thái: Dữ liệu ứng dụng và trạng thái tin nhắn không khớp

Giải pháp ngăn chặn mất tin nhắn

Giao dịch tin nhắn hai giai đoạn

Ví dụ triển khai trong RocketMQ:

TransactionProducer txProducer = new TransactionProducer("tx_group");
txProducer.setHandler(new TransactionHandler() {
    public TransactionStatus execute(Message msg, Object ctx) {
        return performBusiness() ? COMMITTED : ROLLED_BACK;
    }
    
    public TransactionStatus verify(Message msg) {
        return verifyTransaction(msg.getId()) ? COMMITTED : ROLLED_BACK;
    }
});

Cấu hình lưu trữ bền vững

Thiết lập quan trọng cho RabbitMQ:

Cấu hìnhVí dụTác dụng
Durable queuedurable=trueBảo toàn metadata hàng đợi
Persistent messagedeliveryMode=2Lưu tin nhắn lên đĩa
Lazy Queuex-queue-mode=lazyGhi trực tiếp vào ổ đĩa
Xác nhận nhà sản xuấtpublisher-confirmsXác nhận gửi tin thành công

Thiết lập bản sao dự phòng

Công nghệCấu hình khuyến nghị
Kafkaacks=all, min.insync.replicas=2
RocketMQSYNC_FLUSH, master-slave đồng bộ
PulsarBookKeeper đa bản sao

Giải pháp xử lý tiêu thụ trùng lặp

Định danh duy nhất

UniqueIDGenerator generator = new UniqueIDGenerator();
String messageId = "TXN_" + generator.nextID();

if (cacheClient.setIfAbsent(messageId, "LOCKED")) {
    cacheClient.expire(messageId, 48 * 3600);
    executeBusinessLogic();
}

Thiết kế xử lý lặp an toàn

Kịch bảnGiải phápCơ chế
Nhất quán mạnhSELECT ... FOR UPDATEKhóa hàng cơ sở dữ liệu
Nhất quán cuối cùngKiểm soát phiên bảnThử lại có kiểm soát
Bồi hoànThao tác đảo ngượcGhi nhật ký bắt buộc

Hàng đợi tin nhắn lỗi

channel.basicConsume(queueName, false, callback);

void process(Message msg) {
    try {
        processBusiness();
        channel.ack(msg);
    } catch (Exception e) {
        if (retryCount++ < MAX_RETRY) {
            channel.nack(msg, true);
        } else {
            channel.nack(msg, false);
        }
    }
}

Kinh nghiệm triển khai thực tế

  1. Sử dụng ID nghiệp vụ thay vì ID hệ thống
  2. Triển khai logic idempotent cho mọi xử lý tin nhắn
  3. Đồng bộ giao dịch cơ sở dữ liệu và hàng đợi
  4. Giới hạn luồng tiêu thụ theo phân vùng
  5. Giám sát hàng đợi lỗi chặt chẽ
  6. Kiểm thử điều kiện mạng bất ổn
  7. Hỗ trợ phiên bản cho cấu trúc tin nhắn
  8. Không dùng hàng đợi làm luồng chính
  9. Lưu vị trí offset định kỳ
  10. Liên kết chỉ số nghiệp vụ và giám sát hàng đợi

Thẻ: rabbitmq kafka RocketMQ Idempotency DistributedSystems

Đăng vào ngày 27 tháng 6 lúc 08:16