Trong hệ thống thương mại điện tử, khi khách hàng tạo đơn hàng nhưng không thanh toán trong 30 phút, hệ thống cần tự động cập nhật trạng thái đơn hàng. Giải pháp hàng đợi trễ (delay queue) sử dụng Kafka có thể giải quyết vấn đề này một cách hiệu quả.
public static class OrderQueueConfig
{
public const string NewOrderTopic = "new-order";
public const string TimeoutTopic = "order-timeout";
}Triển khai bộ tiêu thụ (consumer) xử lý hàng đợi trễ:
public void ProcessTimeoutOrders()
{
var config = CreateConsumerConfig();
config.AutoOffsetReset = AutoOffsetReset.Earliest;
config.GroupId = "order-processing";
using var consumer = new ConsumerBuilder<string, string>(config).Build();
consumer.Subscribe(OrderQueueConfig.TimeoutTopic);
// Khôi phục vị trí đọc từ Redis
var offsetKey = $"offset:{OrderQueueConfig.TimeoutTopic}";
var currentOffset = _cache.GetInt(offsetKey) ?? 0;
// Thiết lập điểm bắt đầu tiêu thụ
consumer.Assign(new TopicPartitionOffset(
OrderQueueConfig.TimeoutTopic,
Partition.Unassigned,
new Offset(currentOffset + 1)
));
while (true)
{
// Tạm dừng tiêu thụ trong 60 giây
consumer.Pause(new[] {
new TopicPartition(OrderQueueConfig.TimeoutTopic, Partition.Unassigned)
});
// Kích hoạt lại sau khoảng thời gian định trước
var timer = new Timer(_ =>
{
consumer.Resume(new[] {
new TopicPartition(OrderQueueConfig.TimeoutTopic, Partition.Unassigned)
});
}, null, 60000, Timeout.Infinite);
try
{
var message = consumer.Consume();
_logger.LogInformation($"Vị trí: {message.Offset}, Nội dung: {message.Value}");
// Xử lý nghiệp vụ: Cập nhật trạng thái đơn hàng
ProcessOrderTimeout(message.Value);
// Cập nhật offset mới
_cache.SetInt(offsetKey, (int)message.Offset);
}
catch (Exception ex)
{
_logger.LogError(ex, "Lỗi xử lý đơn hàng trễ hạn");
}
}
}Dịch vụ nền để chạy bộ tiêu thụ:
public class OrderTimeoutService : BackgroundService
{
private readonly OrderProcessor _processor;
public OrderTimeoutService(OrderProcessor processor)
{
_processor = processor;
}
protected override Task ExecuteAsync(CancellationToken token)
{
_processor.ProcessTimeoutOrders();
return Task.CompletedTask;
}
}API gửi thông báo đơn hàng mới:
[HttpGet("notify")]
public IActionResult NotifyOrder(string orderData)
{
_orderPublisher.Publish(
OrderQueueConfig.NewOrderTopic,
Guid.NewGuid().ToString(),
orderData
);
return Ok();
}Hệ thống sẽ xử lý các đơn hàng trễ hạn theo chu kỳ 60 giây, đảm bảo cập nhật trạng thái tự động sau khoảng thời gian quy định.