Giải pháp Xử lý Lỗi và Tính Năng Nâng cao trong RabbitMQ

RabbitMQ cung cấp cơ chế xác nhận (Confirm) để đảm bảo rằng các tin nhắn đã được gửi đến exchange thành công.

await Kênh.XácNhậnChọnAsync();

ThuộcTínhCơBản properties = new ThuộcTínhCơBản()
{
    DeliveryMode = DeliveryModes.Persistent,
};

await Kênh.XuấtBảnCơBảnAsync(exchange: exchange, routingKey: routingKey, body: Encoding.UTF8.GetBytes(message),
    mandatory: true, basicProperties: properties);

if (await Kênh.ChờXácNhậnAsync())
{
    _logger.LogInformation($"{message}, tin nhắn đã được gửi thành công đến exchange");
};

// Khi đặt mandatory: true, nếu exchange không thể tìm thấy một queue phù hợp dựa trên loại của nó và routeKey, // RabbitMQ sẽ gọi phương thức basic.return để trả lại tin nhắn cho nhà sản xuất; // Khi đặt là false, trong trường hợp trên, broker sẽ loại bỏ tin nhắn trực tiếp.

public async ValueTask XuấtBảnCơBảnAsync(string tinNhan, string exchange, string routingKey, ThuộcTínhCơBản basicProperties, ReadOnlyMemory<byte> body = default, bool mandatory = false, CancellationToken cancellationToken = default)
{
    if (!Kênh.IsOpen) throw new Exception($"kết nối rabbitmq đã đóng {Kênh.IsOpen}");

    await Kênh.XácNhậnChọnAsync();
    ThuộcTínhCơBản properties = new ThuộcTínhCơBản()
    {
        DeliveryMode = DeliveryModes.Persistent,
    };
    
    await Kênh.XuấtBảnCơBảnAsync(exchange: exchange, routingKey: routingKey, body: Encoding.UTF8.GetBytes(tinNhan),
       mandatory: true, basicProperties: properties);

    if (await Kênh.ChờXácNhậnAsync())
    {
        _logger.LogInformation($"{tinNhan}, tin nhắn đã được gửi thành công đến exchange");
    };

    Kênh.BasicReturn += (sender, e) =>
    {
        _logger.LogInformation($"{tinNhan}, tin nhắn không khớp với route");
    };
}
Kênh.BasicNacks và Kênh.BasicAcks
// RabbitMQ sẽ gửi một xác nhận (Basic.Ack) cho nhà sản xuất (chứa ID duy nhất của tin nhắn),
// thông báo cho nhà sản xuất rằng tin nhắn đã đến đích chính xác.
// Nếu tin nhắn và hàng đợi là có thể lưu trữ (persistent), tin nhắn xác nhận sẽ được gửi sau khi tin nhắn được ghi vào đĩa.
public async ValueTask XuấtBảnCơBảnAsync(string tinNhan, string exchange, string routingKey, CancellationToken cancellationToken = default)
{
    if (!Kênh.IsOpen) throw new Exception($"kết nối rabbitmq đã đóng {Kênh.IsOpen}");
    await Kênh.XácNhậnChọnAsync();

    ThuộcTínhCơBản properties = new ThuộcTínhCơBản()
    {
        DeliveryMode = DeliveryModes.Persistent,
    };
    await Kênh.XuấtBảnCơBảnAsync(exchange: exchange, routingKey: routingKey, body: Encoding.UTF8.GetBytes(tinNhan),
        mandatory: true, basicProperties: properties);

    // Khi sử dụng RabbitMQ, chúng ta cần xem xét một vấn đề là sau khi nhà sản xuất gửi tin nhắn đi,
    // tin nhắn đó có đến được server một cách chính xác không? Mặc định, việc gửi tin nhắn sẽ không trả về bất kỳ thông tin nào cho nhà sản xuất,
    // nghĩa là mặc định nhà sản xuất không thể xác nhận tin nhắn có đến server một cách chính xác.

    // RabbitMQ đã cung cấp hai giải pháp cho vấn đề này:
    // Thực hiện thông qua cơ chế giao dịch (khác với khái niệm giao dịch trong cơ sở dữ liệu);
    // Thực hiện thông qua cơ chế xác nhận từ phía gửi (publisher confirm).

    // Cơ chế giao dịch
    // Các phương thức liên quan đến cơ chế giao dịch có: kênh.TxSelect, kênh.TxCommit và kênh.TxRollback.
    //    kênh.TxSelect đặt kênh hiện tại ở chế độ giao dịch, kênh.TxCommit được dùng để gửi giao dịch,
    //    kênh.TxRollback được dùng để hoàn tác giao dịch. Sau khi sử dụng kênh.TxSelect để bật giao dịch,
    //    nếu việc gửi tin nhắn đến RabbitMQ thành công và giao dịch được gửi đi thành công,
    //    thì tin nhắn chắc chắn đã đến RabbitMQ, nếu trước khi gửi giao dịch mà RabbitMQ gặp sự cố hoặc có ngoại lệ khác,
    //    có thể sử dụng kênh.TxRollback để hoàn tác giao dịch.

    Kênh.BasicAcks += (sender, e) =>
    {
        // RabbitMQ sẽ gửi một xác nhận (Basic.Ack) cho nhà sản xuất (chứa ID duy nhất của tin nhắn),
        // thông báo cho nhà sản xuất rằng tin nhắn đã đến đích chính xác.
        // Nếu tin nhắn và hàng đợi là có thể lưu trữ, tin nhắn xác nhận sẽ được gửi sau khi tin nhắn được ghi vào đĩa
        _logger.LogInformation($"{tinNhan}, tin nhắn đã đến Broker chính xác và Broker đã nhận được ACK");
    };
    Kênh.BasicNacks += (sender, e) =>
    {
        _logger.LogInformation($"{tinNhan}, tin nhắn không đến Broker một cách chính xác");
    };
    Kênh.BasicReturn += (sender, e) =>
    {
        _logger.LogInformation($"{tinNhan}, tin nhắn đã đến Broker nhưng không khớp với route");
    };
}

Hàng đợi ưu tiên

// Thuộc tính x-max-priority phải được đặt, nếu không thì mức độ ưu tiên của tin nhắn sẽ không có hiệu lực

kênh.KhaiBáoHàngĐợi(queueName, durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object> { { "x-max-priority", 10 } });
var properties = kênh.TạoThuộcTínhCơBản();
mứcƯuTiện = (byte)random.Next(1, 10);
properties.Priority = mứcƯuTiện; // Đặt mức độ ưu tiên của tin nhắn, giá trị từ 1 đến 9.
var tinNhan = TiềnTinNhan + i.ToString() + "____" + mứcƯuTiện;
var body = Encoding.UTF8.GetBytes(tinNhan);
kênh.XuấtBảnCơBản(exchange: exchange, routingKey: ExchangeType.Fanout, basicProperties: properties, body: body);
Console.WriteLine($"{DateTime.Now.ToString()} Gửi {tinNhan} , Mức ưu tiên {mứcƯuTiện}");

Tiêu thụ tin nhắn theo thứ tự

Nhà sản xuất gửi tin nhắn ví dụ: tin nhắn đặt hàng ----> tin nhắn thanh toán ----> tin nhắn có thứ tự, tức là tin nhắn do nhà sản xuất gửi có thứ tự. Gửi tin nhắn đến một hàng đợi cùng một lúc và một người tiêu thụ tiêu thụ có thể đảm bảo tiêu thụ theo thứ tự.

Chế độ hoạt động đơn: chỉ có một người tiêu thụ có thể tiêu thụ, nhiều người tiêu thụ khác chỉ có thể ở trạng thái nhàn rỗi. Khi người tiêu thụ đang hoạt động gặp sự cố, chỉ có một người tiêu thụ khác có thể tiếp tục tiêu thụ.

Cấu hình người tiêu thụ đơn khi khai báo hàng đợi, thêm tham số x-single-active-consumer

IDictionary dic = base.LấyĐốiSốHàngĐợi(exchangeName, queueName, LấyGiâyTTL()); dic.Add("x-single-active-consumer", true);

Trạng thái hàng đợi SAC của Rabbitmq

Thẻ: rabbitmq message-queue publisher-confirm priority-queue message-consumption

Đăng vào ngày 4 tháng 7 lúc 16:01