Hàng đợi trì hoãn, tương tự như hàng đợi thư chết
Cài đặt hàng đợi và tham số:
| Tên tham số | Kiểu dữ liệu | Mô tả |
|---|---|---|
| x-message-tti (Time-To-Live) | int, mili giây | Đặt thời gian dự kiến cho tin nhắn, sau khi hết hạn sẽ bị loại bỏ |
| x-max-length | int | Giới hạn độ dài tối đa của hàng đợi, số lượng, xóa phần tử cũ nhất khi thêm mới |
| x-expires | int, mili giây | Thời gian tự động xóa hàng đợi khi không có truy cập |
| x-max-length-bytes | int | Giới hạn dung lượng tối đa của hàng đợi |
| x-dead-letter-exchange | String | Chỉ định bộ chuyển đổi thư chết |
| x-dead-letter-routing-key | String | Khóa định tuyến thư chết, chỉ định routingKey |
| x-max-priority | int | Độ ưu tiên của hàng đợi |
| x-queue-mode | Mặc định "lazy" | Chế độ hàng đợi, mặc định lazy (lưu dữ liệu vào đĩa, đưa vào bộ nhớ khi tiêu thụ) |
| x-queue-master-locator | QueueBuilder.MasterLocator | Chiến lược chọn hàng đợi chính, min-masters: chọn nút có số lượng kết nối nhỏ nhất, client-local: chọn nút mà khách hàng khai báo hàng đợi kết nối, min-masters: chọn ngẫu nhiên một nút |
Gửi tin nhắn
/// <summary>
/// Thiết lập kết nối với hỗ trợ đa luồng
/// </summary>
public void SetupConnection()
{
if (_connection == null)
{
lock (_connectionLock)
{
var factory = new ConnectionFactory()
{
HostName = _rabbitOptions.Host,
VirtualHost = _rabbitOptions.VirtualHost,
Password = _rabbitOptions.Password,
UserName = _rabbitOptions.Username,
Port = _rabbitOptions.Port
};
_connection = factory.CreateConnection();
}
}
}
/// <summary>
/// Gửi tin nhắn với độ trễ
/// </summary>
/// <param name="messageContent">Nội dung tin nhắn</param>
public void SendDelayedMessage(string messageContent)
{
// Thiết lập kết nối
SetupConnection();
// Tạo kênh
var channel = _connection.CreateModel();
var queueArguments = new Dictionary<string, object>();
// Thiết lập thời gian sống cho tin nhắn
queueArguments.Add("x-message-ttl", 15000); // 15 giây cho tin nhắn
queueArguments.Add("x-dead-letter-exchange", "deadLetterExchange"); // Đổi hướng tin nhắn hết hạn
queueArguments.Add("x-dead-letter-routing-key", "delayedRoutingKey"); // Khóa định tuyến cho tin nhắn hết hạn
// Khai báo bộ chuyển đổi
channel.ExchangeDeclare("delayExchange", ExchangeType.Direct, true, false, null);
// Khai báo hàng đợi với các tham số
channel.QueueDeclare(queue: "delayQueue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: queueArguments);
// Liên kết hàng đợi với bộ chuyển đổi
channel.QueueBind(queue: "delayQueue", "delayExchange", "delayRoute");
var message = messageContent;
var messageBytes = Encoding.UTF8.GetBytes(message);
// Gửi tin nhắn đến hàng đợi
channel.BasicPublish(exchange: "delayExchange",
routingKey: "delayRoute",
basicProperties: null,
body: messageBytes);
Console.WriteLine($"[x] Đã gửi: {message}");
}
Nhận tin nhắn
/// <summary>
/// Nhận tin nhắn từ hàng đợi trì hoãn
/// </summary>
/// <param name="messageHandler">Hàm xử lý tin nhắn</param>
public void ReceiveDelayedMessages(Func<string, bool> messageHandler)
{
// Thiết lập kết nối
SetupConnection();
// Tạo kênh
var channel = _connection.CreateModel();
// Khai báo bộ chuyển đổi và hàng đợi
channel.ExchangeDeclare(exchange: "deadLetterExchange", type: "direct", true, false, null);
channel.QueueDeclare("deadLetterQueue", true, false, false, null);
channel.QueueBind(queue: "deadLetterQueue", exchange: "deadLetterExchange", routingKey: "delayedRoutingKey");
// Thiết lập consumer
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var messageBytes = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(messageBytes);
var routingKey = ea.RoutingKey;
Console.WriteLine($"Khóa định tuyến: {routingKey}");
Console.WriteLine($"[x] Nhận được: {message}");
// Xử lý tin nhắn
messageHandler(message);
};
// Bắt đầu tiêu thụ
channel.BasicConsume(queue: "deadLetterQueue",
autoAck: true,
consumer: consumer);
Console.WriteLine("Nhấn Enter để thoát.");
}