Xử lý Đơn Hàng Trễ Hạn với Cơ Chế Hàng Đợi Trong .NET Core

Trong hệ thống thương mại điện tử, khi khách hàng tạo đơn hàng nhưng không thanh toán trong 30 phút, hệ thống cần tự động cập nhật trạng thái đơn hàng. Giải pháp hàng đợi trễ (delay queue) sử dụng Kafka có thể giải quyết vấn đề này một cách hiệu quả.

public static class OrderQueueConfig
{
    public const string NewOrderTopic = "new-order";
    public const string TimeoutTopic = "order-timeout";
}

Triển khai bộ tiêu thụ (consumer) xử lý hàng đợi trễ:

public void ProcessTimeoutOrders()
{
    var config = CreateConsumerConfig();
    config.AutoOffsetReset = AutoOffsetReset.Earliest;
    config.GroupId = "order-processing";
    
    using var consumer = new ConsumerBuilder<string, string>(config).Build();
    consumer.Subscribe(OrderQueueConfig.TimeoutTopic);

    // Khôi phục vị trí đọc từ Redis
    var offsetKey = $"offset:{OrderQueueConfig.TimeoutTopic}";
    var currentOffset = _cache.GetInt(offsetKey) ?? 0;
    
    // Thiết lập điểm bắt đầu tiêu thụ
    consumer.Assign(new TopicPartitionOffset(
        OrderQueueConfig.TimeoutTopic, 
        Partition.Unassigned, 
        new Offset(currentOffset + 1)
    ));

    while (true)
    {
        // Tạm dừng tiêu thụ trong 60 giây
        consumer.Pause(new[] { 
            new TopicPartition(OrderQueueConfig.TimeoutTopic, Partition.Unassigned) 
        });
        
        // Kích hoạt lại sau khoảng thời gian định trước
        var timer = new Timer(_ => 
        {
            consumer.Resume(new[] { 
                new TopicPartition(OrderQueueConfig.TimeoutTopic, Partition.Unassigned) 
            });
        }, null, 60000, Timeout.Infinite);
        
        try
        {
            var message = consumer.Consume();
            _logger.LogInformation($"Vị trí: {message.Offset}, Nội dung: {message.Value}");
            
            // Xử lý nghiệp vụ: Cập nhật trạng thái đơn hàng
            ProcessOrderTimeout(message.Value);
            
            // Cập nhật offset mới
            _cache.SetInt(offsetKey, (int)message.Offset);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Lỗi xử lý đơn hàng trễ hạn");
        }
    }
}

Dịch vụ nền để chạy bộ tiêu thụ:

public class OrderTimeoutService : BackgroundService
{
    private readonly OrderProcessor _processor;

    public OrderTimeoutService(OrderProcessor processor)
    {
        _processor = processor;
    }

    protected override Task ExecuteAsync(CancellationToken token)
    {
        _processor.ProcessTimeoutOrders();
        return Task.CompletedTask;
    }
}

API gửi thông báo đơn hàng mới:

[HttpGet("notify")]
public IActionResult NotifyOrder(string orderData)
{
    _orderPublisher.Publish(
        OrderQueueConfig.NewOrderTopic, 
        Guid.NewGuid().ToString(), 
        orderData
    );
    return Ok();
}

Hệ thống sẽ xử lý các đơn hàng trễ hạn theo chu kỳ 60 giây, đảm bảo cập nhật trạng thái tự động sau khoảng thời gian quy định.

Thẻ: kafka .NET Core 6 Redis BackgroundService

Đăng vào ngày 16 tháng 6 lúc 07:16