Kết nối RabbitMQ sử dụng Mẫu Singleton
Để quản lý một kết nối duy nhất đến RabbitMQ, chúng ta có thể sử dụng Mẫu Singleton. Dưới đây là một triển khai:
public interface IRabbitMqConnection
{
IConnection Connection { get; }
}
public class RabbitMqConnection : IRabbitMqConnection
{
private static readonly RabbitMqConnection instance = new RabbitMqConnection();
private readonly ILogger logger;
private static readonly object syncRoot = new object();
public IConnection Connection { get; private set; }
// Constructor riêng tư để ngăn chặn việc khởi tạo từ bên ngoài
private RabbitMqConnection() { }
// Constructor riêng tư để khởi tạo kết nối
private RabbitMqConnection(RabbitMqOptions options, string clientName, ILogger logger)
{
this.logger = logger;
var factory = new ConnectionFactory
{
HostName = options.HostName,
Port = options.Port,
UserName = options.UserName,
Password = options.Password,
VirtualHost = options.VirtualHost,
AutomaticRecoveryEnabled = true
};
// Sử dụng Polly để thử lại khi gặp lỗi kết nối
var retryPolicy = Policy
.Handle<SocketException>()
.WaitAndRetry(3, retryAttempt => TimeSpan.FromSeconds(1), (ex, time) =>
{
if (retryAttempt == 3)
{
logger.LogWarning($"Không thể kết nối sau 3 lần thử: {ex.Message}");
throw ex;
}
});
retryPolicy.Execute(() =>
{
Connection = factory.CreateConnection(clientName);
});
}
// Phương thức công khai để lấy instance duy nhất
public static RabbitMqConnection GetInstance(RabbitMqOptions options, string clientName, ILogger logger)
{
if (instance.Connection == null)
{
lock (syncRoot)
{
if (instance.Connection == null)
{
new RabbitMqConnection(options, clientName, logger);
}
}
}
return instance;
}
}
Lớp trừu tượng cho Dịch vụ Người tiêu dùng Kafka
Triển khai một lớp cơ sở trừu tượng cho các dịch vụ `IHostedService` để tiêu thụ tin nhắn từ Kafka giúp tái sử dụng logic.
public abstract class BaseKafkaConsumerService : IHostedService
{
protected readonly IOptions<KafkaConfig> kafkaOptions;
private readonly IKafkaConsumerProvider consumerProvider;
private IConsumer<string, byte[]> kafkaConsumer;
protected BaseKafkaConsumerService(IKafkaConsumerProvider consumerProvider, IOptions<KafkaConfig> options)
{
this.kafkaOptions = options ?? throw new ArgumentNullException(nameof(options));
this.consumerProvider = consumerProvider;
}
// Cấu hình cho người tiêu dùng
protected abstract ConsumerConfig GetConsumerConfiguration();
// Danh sách các chủ đề cần theo dõi
public abstract List<string> GetSubscribedTopics();
// Xử lý tin nhắn nhận được
public abstract void HandleMessage(ConsumeResult<string, byte[]> message);
public Task StartAsync(CancellationToken cancellationToken)
{
var config = GetConsumerConfiguration();
kafkaConsumer = consumerProvider.CreateConsumer(config);
kafkaConsumer.Subscribe(GetSubscribedTopics());
Task.Run(() =>
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
var consumeResult = kafkaConsumer.Consume(cancellationToken);
if (consumeResult.IsPartitionEOF)
{
Console.WriteLine($"Đã đến cuối chủ đề {consumeResult.Topic}, phân vùng {consumeResult.Partition}, offset {consumeResult.Offset}.");
continue;
}
if (consumeResult != null)
{
HandleMessage(consumeResult);
}
try
{
kafkaConsumer.StoreOffset(consumeResult);
}
catch (KafkaException ex)
{
Console.WriteLine($"Lỗi khi lưu offset: {ex.Error.Reason}");
}
}
catch (OperationCanceledException)
{
// Bỏ qua nếu tác vụ bị hủy
}
catch (ConsumeException ex)
{
Console.WriteLine($"Lỗi khi tiêu thụ: {ex.Error.Reason}");
}
}
}, cancellationToken);
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
kafkaConsumer?.Close();
kafkaConsumer?.Dispose();
return Task.CompletedTask;
}
}
// Triển khai cụ thể 1
public class OrderProcessingConsumer : BaseKafkaConsumerService
{
public OrderProcessingConsumer(IKafkaConsumerProvider provider, IOptions<KafkaConfig> options) : base(provider, options) { }
protected override ConsumerConfig GetConsumerConfiguration()
{
return new ConsumerConfig
{
BootstrapServers = kafkaOptions.Value.BootstrapServers,
AutoOffsetReset = AutoOffsetReset.Latest,
GroupId = "order-processor",
EnableAutoCommit = false
};
}
public override List<string> GetSubscribedTopics()
{
return new List<string> { "orders" };
}
public override void HandleMessage(ConsumeResult<string, byte[]> message)
{
var messageBody = Encoding.UTF8.GetString(message.Message.Value);
Console.WriteLine($"Đã xử lý tin nhắn mới: {messageBody}");
}
}
// Triển khai cụ thể 2
public class OrderProcessingConsumerV2 : BaseKafkaConsumerService
{
public OrderProcessingConsumerV2(IKafkaConsumerProvider provider, IOptions<KafkaConfig> options) : base(provider, options) { }
protected override ConsumerConfig GetConsumerConfiguration()
{
return new ConsumerConfig
{
BootstrapServers = kafkaOptions.Value.BootstrapServers,
AutoOffsetReset = AutoOffsetReset.Latest,
GroupId = "order-processor-v2",
EnableAutoCommit = false
};
}
public override List<string> GetSubscribedTopics()
{
return new List<string> { "orders" };
}
public override void HandleMessage(ConsumeResult<string, byte[]> message)
{
var messageBody = Encoding.UTF8.GetString(message.Message.Value);
Console.WriteLine($"Đã xử lý tin nhắn mới (phiên bản 2): {messageBody}");
}
}
Đăng ký Dịch vụ Hosted Service theo Lô
Thay vì đăng ký từng dịch vụ một, chúng ta có thể quét các loại triển khai của lớp cơ sở và đăng ký chúng tự động.
public static class ServiceRegistrationExtensions
{
public static IServiceCollection AddKafkaConsumers(this IServiceCollection services, IConfigurationSection kafkaConfigSection)
{
services.Configure<KafkaConfig>(kafkaConfigSection);
services.AddSingleton<IKafkaConsumerProvider, KafkaConsumerProvider>();
var baseConsumerType = typeof(BaseKafkaConsumerService);
// Tìm tất cả các lớp kế thừa từ BaseKafkaConsumerService trong assembly hiện tại
var consumerTypes = Assembly.GetEntryAssembly()
.GetExportedTypes()
.Where(t => t.IsSubclassOf(baseConsumerType) && !t.IsAbstract)
.ToList();
foreach (var consumerType in consumerTypes)
{
services.AddSingleton(typeof(IHostedService), consumerType);
}
return services;
}
public static IServiceCollection AddKafkaProducer(this IServiceCollection services, IConfigurationSection kafkaConfigSection)
{
services.Configure<KafkaConfig>(kafkaConfigSection);
services.AddSingleton<IKafkaProducer>(sp =>
{
var config = sp.GetRequiredService<IOptions<KafkaConfig>>().Value;
return KafkaProducer.CreateInstance(producerConfig =>
{
producerConfig.BootstrapServers = config.BootstrapServers;
producerConfig.MessageTimeoutMs = 5000;
producerConfig.EnableIdempotence = true;
});
});
return services;
}
}