Sử Dụng Hàng Đợi Trì Hoãn RabbitMQ Trong .NET Core

Hàng đợi trì hoãn, tương tự như hàng đợi thư chết

Cài đặt hàng đợi và tham số:

Tên tham số Kiểu dữ liệu Mô tả
x-message-tti (Time-To-Live) int, mili giây Đặt thời gian dự kiến cho tin nhắn, sau khi hết hạn sẽ bị loại bỏ
x-max-length int Giới hạn độ dài tối đa của hàng đợi, số lượng, xóa phần tử cũ nhất khi thêm mới
x-expires int, mili giây Thời gian tự động xóa hàng đợi khi không có truy cập
x-max-length-bytes int Giới hạn dung lượng tối đa của hàng đợi
x-dead-letter-exchange String Chỉ định bộ chuyển đổi thư chết
x-dead-letter-routing-key String Khóa định tuyến thư chết, chỉ định routingKey
x-max-priority int Độ ưu tiên của hàng đợi
x-queue-mode Mặc định "lazy" Chế độ hàng đợi, mặc định lazy (lưu dữ liệu vào đĩa, đưa vào bộ nhớ khi tiêu thụ)
x-queue-master-locator QueueBuilder.MasterLocator Chiến lược chọn hàng đợi chính, min-masters: chọn nút có số lượng kết nối nhỏ nhất, client-local: chọn nút mà khách hàng khai báo hàng đợi kết nối, min-masters: chọn ngẫu nhiên một nút

Gửi tin nhắn

/// <summary>
/// Thiết lập kết nối với hỗ trợ đa luồng
/// </summary>
public void SetupConnection()
{
    if (_connection == null)
    {
        lock (_connectionLock)
        {
            var factory = new ConnectionFactory()
            {
                HostName = _rabbitOptions.Host,
                VirtualHost = _rabbitOptions.VirtualHost,
                Password = _rabbitOptions.Password,
                UserName = _rabbitOptions.Username,
                Port = _rabbitOptions.Port
            };
            _connection = factory.CreateConnection();
        }
    }
}
/// <summary>
/// Gửi tin nhắn với độ trễ
/// </summary>
/// <param name="messageContent">Nội dung tin nhắn</param>
public void SendDelayedMessage(string messageContent)
{
    // Thiết lập kết nối
    SetupConnection();
    // Tạo kênh
    var channel = _connection.CreateModel();
    var queueArguments = new Dictionary<string, object>();
    
    // Thiết lập thời gian sống cho tin nhắn
    queueArguments.Add("x-message-ttl", 15000); // 15 giây cho tin nhắn
    queueArguments.Add("x-dead-letter-exchange", "deadLetterExchange"); // Đổi hướng tin nhắn hết hạn
    queueArguments.Add("x-dead-letter-routing-key", "delayedRoutingKey"); // Khóa định tuyến cho tin nhắn hết hạn
    
    // Khai báo bộ chuyển đổi
    channel.ExchangeDeclare("delayExchange", ExchangeType.Direct, true, false, null);
    
    // Khai báo hàng đợi với các tham số
    channel.QueueDeclare(queue: "delayQueue",
        durable: true,
        exclusive: false,
        autoDelete: false,
        arguments: queueArguments);
    
    // Liên kết hàng đợi với bộ chuyển đổi
    channel.QueueBind(queue: "delayQueue", "delayExchange", "delayRoute");
    
    var message = messageContent;
    var messageBytes = Encoding.UTF8.GetBytes(message);
    
    // Gửi tin nhắn đến hàng đợi
    channel.BasicPublish(exchange: "delayExchange",
        routingKey: "delayRoute",
        basicProperties: null,
        body: messageBytes);
    
    Console.WriteLine($"[x] Đã gửi: {message}");
}

Nhận tin nhắn

/// <summary>
/// Nhận tin nhắn từ hàng đợi trì hoãn
/// </summary>
/// <param name="messageHandler">Hàm xử lý tin nhắn</param>
public void ReceiveDelayedMessages(Func<string, bool> messageHandler)
{
    // Thiết lập kết nối
    SetupConnection();
    // Tạo kênh
    var channel = _connection.CreateModel();
    
    // Khai báo bộ chuyển đổi và hàng đợi
    channel.ExchangeDeclare(exchange: "deadLetterExchange", type: "direct", true, false, null);
    channel.QueueDeclare("deadLetterQueue", true, false, false, null);
    channel.QueueBind(queue: "deadLetterQueue", exchange: "deadLetterExchange", routingKey: "delayedRoutingKey");
    
    // Thiết lập consumer
    var consumer = new EventingBasicConsumer(channel);
    consumer.Received += (model, ea) =>
    {
        var messageBytes = ea.Body.ToArray();
        var message = Encoding.UTF8.GetString(messageBytes);
        var routingKey = ea.RoutingKey;
        
        Console.WriteLine($"Khóa định tuyến: {routingKey}");
        Console.WriteLine($"[x] Nhận được: {message}");
        
        // Xử lý tin nhắn
        messageHandler(message);
    };
    
    // Bắt đầu tiêu thụ
    channel.BasicConsume(queue: "deadLetterQueue",
        autoAck: true,
        consumer: consumer);
    
    Console.WriteLine("Nhấn Enter để thoát.");
}

Đăng vào ngày 29 tháng 5 lúc 19:42