Gửi tin nhắn theo lô với hỗ trợ giao dịch: Đảm bảo tính toàn vẹn dữ liệu
Trong môi trường phân tán, việc đảm bảo rằng một nhóm tin nhắn được gửi thành công đồng thời hoặc hoàn tác nếu có lỗi là rất quan trọng. Kafka cung cấp cơ chế giao dịch (transaction) cho phép thực hiện điều này — tất cả các bản ghi đều được cam kết (commit) nếu thành công, hoặc bị hủy bỏ (abort) nếu xảy ra sự cố.
Lưu ý quan trọng: Để kích hoạt giao dịch, bạn phải cấu hình TransactionalId. Ngoài ra, cần bật tính năng sản xuất idempotent để tránh gửi trùng khi thử lại.
Ví dụ triển khai gửi theo lô với giao dịch
public void SendBatchMessages(List<string> messages)
{
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092",
MessageTimeoutMs = 5000,
EnableIdempotence = true, // Bật chế độ idempotent
TransactionalId = Guid.NewGuid().ToString() // ID giao dịch bắt buộc
};
using var producer = new ProducerBuilder<string, string>(config)
.SetDefaultPartitioner(Partitioner.Random) // Phân vùng ngẫu nhiên
.Build();
try
{
// Khởi tạo ngữ cảnh giao dịch
producer.InitTransactions(TimeSpan.FromSeconds(60));
producer.BeginTransaction();
foreach (var message in messages)
{
var deliveryResult = producer.ProduceAsync(
"order-events",
new Message<string, string>
{
Key = "batch-key",
Value = message
}).GetAwaiter().GetResult();
Console.WriteLine($"Đã gửi tin nhắn tới {deliveryResult.TopicPartitionOffset}");
}
// Cam kết toàn bộ giao dịch
producer.CommitTransaction();
}
catch (ProduceException<string, string> ex)
{
Console.WriteLine($"Lỗi khi gửi tin nhắn: {ex.Error.Reason}");
producer.AbortTransaction(); // Hủy giao dịch nếu có lỗi
}
}
Xử lý tiêu thụ tin nhắn với quản lý offset bằng Redis
Khi tiêu thụ tin nhắn từ Kafka, việc quản lý điểm dừng (offset) là then chốt để tránh mất dữ liệu hoặc xử lý trùng. Thay vì dựa vào commit tự động, chúng ta sẽ lưu trữ offset thủ công vào Redis để kiểm soát chính xác vị trí đọc.
Triển khai consumer với offset lưu trong Redis
public void ConsumeWithRedisOffset()
{
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "order-consumer-group",
EnableAutoCommit = false,
AutoOffsetReset = AutoOffsetReset.Earliest
};
using var consumer = new ConsumerBuilder<string, string>(config).Build();
// Lấy offset đã lưu từ Redis
var redisKey = "kafka_offset_order_events";
var savedOffsetStr = _redisCache.GetString(redisKey);
var startOffset = !string.IsNullOrEmpty(savedOffsetStr) ? long.Parse(savedOffsetStr) + 1 : 0;
// Gán vị trí bắt đầu tiêu thụ
consumer.Assign(new TopicPartitionOffset("order-events", 0, new Offset(startOffset)));
while (true)
{
try
{
var consumeResult = consumer.Consume(CancellationToken.None);
var currentOffset = consumeResult.Offset.Value;
// Xử lý tin nhắn
Console.WriteLine($"Tin nhắn nhận được: {consumeResult.Value}, Partition: {consumeResult.Partition}");
// Cập nhật offset mới vào Redis
_redisCache.SetString(redisKey, currentOffset.ToString());
}
catch (ConsumeException ex)
{
Console.WriteLine($"Lỗi khi tiêu thụ: {ex.Error.Reason}");
}
}
}
Tích hợp vào ứng dụng nền (Background Service)
Dưới đây là cách tích hợp consumer vào một dịch vụ nền chạy liên tục trong ứng dụng .NET Worker Service.
public class OrderConsumerWorker : BackgroundService
{
private readonly ILogger<OrderConsumerWorker> _logger;
private readonly KafkaConsumerService _consumerService;
public OrderConsumerWorker(ILogger<OrderConsumerWorker> logger, KafkaConsumerService consumerService)
{
_logger = logger;
_consumerService = consumerService;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await Task.Run(() => _consumerService.ConsumeWithRedisOffset(), stoppingToken);
}
}
API Controller để gọi gửi theo lô
Một endpoint đơn giản để kích hoạt việc gửi hàng loạt tin nhắn qua HTTP POST.
[ApiController]
[Route("[controller]")]
public class MessageController : ControllerBase
{
private readonly KafkaProducerService _producerService;
public MessageController(KafkaProducerService producerService)
{
_producerService = producerService;
}
[HttpPost("send-batch")]
public IActionResult SendBatch([FromBody] List<string> messages)
{
if (messages == null || !messages.Any())
return BadRequest("Danh sách tin nhắn không hợp lệ.");
_producerService.SendBatchMessages(messages);
return Ok("Gửi tin nhắn theo lô thành công.");
}
}