Mẫu Singleton và Đăng ký Dịch vụ Hosted Service trong .NET

Kết nối RabbitMQ sử dụng Mẫu Singleton

Để quản lý một kết nối duy nhất đến RabbitMQ, chúng ta có thể sử dụng Mẫu Singleton. Dưới đây là một triển khai:

public interface IRabbitMqConnection
{
    IConnection Connection { get; }
}

public class RabbitMqConnection : IRabbitMqConnection
{
    private static readonly RabbitMqConnection instance = new RabbitMqConnection();
    private readonly ILogger logger;
    private static readonly object syncRoot = new object();
    public IConnection Connection { get; private set; }

    // Constructor riêng tư để ngăn chặn việc khởi tạo từ bên ngoài
    private RabbitMqConnection() { }

    // Constructor riêng tư để khởi tạo kết nối
    private RabbitMqConnection(RabbitMqOptions options, string clientName, ILogger logger)
    {
        this.logger = logger;
        var factory = new ConnectionFactory
        {
            HostName = options.HostName,
            Port = options.Port,
            UserName = options.UserName,
            Password = options.Password,
            VirtualHost = options.VirtualHost,
            AutomaticRecoveryEnabled = true
        };

        // Sử dụng Polly để thử lại khi gặp lỗi kết nối
        var retryPolicy = Policy
            .Handle<SocketException>()
            .WaitAndRetry(3, retryAttempt => TimeSpan.FromSeconds(1), (ex, time) =>
            {
                if (retryAttempt == 3)
                {
                    logger.LogWarning($"Không thể kết nối sau 3 lần thử: {ex.Message}");
                    throw ex;
                }
            });

        retryPolicy.Execute(() =>
        {
            Connection = factory.CreateConnection(clientName);
        });
    }

    // Phương thức công khai để lấy instance duy nhất
    public static RabbitMqConnection GetInstance(RabbitMqOptions options, string clientName, ILogger logger)
    {
        if (instance.Connection == null)
        {
            lock (syncRoot)
            {
                if (instance.Connection == null)
                {
                    new RabbitMqConnection(options, clientName, logger);
                }
            }
        }
        return instance;
    }
}

Lớp trừu tượng cho Dịch vụ Người tiêu dùng Kafka

Triển khai một lớp cơ sở trừu tượng cho các dịch vụ `IHostedService` để tiêu thụ tin nhắn từ Kafka giúp tái sử dụng logic.

public abstract class BaseKafkaConsumerService : IHostedService
{
    protected readonly IOptions<KafkaConfig> kafkaOptions;
    private readonly IKafkaConsumerProvider consumerProvider;
    private IConsumer<string, byte[]> kafkaConsumer;

    protected BaseKafkaConsumerService(IKafkaConsumerProvider consumerProvider, IOptions<KafkaConfig> options)
    {
        this.kafkaOptions = options ?? throw new ArgumentNullException(nameof(options));
        this.consumerProvider = consumerProvider;
    }

    // Cấu hình cho người tiêu dùng
    protected abstract ConsumerConfig GetConsumerConfiguration();
    // Danh sách các chủ đề cần theo dõi
    public abstract List<string> GetSubscribedTopics();
    // Xử lý tin nhắn nhận được
    public abstract void HandleMessage(ConsumeResult<string, byte[]> message);

    public Task StartAsync(CancellationToken cancellationToken)
    {
        var config = GetConsumerConfiguration();
        kafkaConsumer = consumerProvider.CreateConsumer(config);
        kafkaConsumer.Subscribe(GetSubscribedTopics());

        Task.Run(() =>
        {
            while (!cancellationToken.IsCancellationRequested)
            {
                try
                {
                    var consumeResult = kafkaConsumer.Consume(cancellationToken);
                    if (consumeResult.IsPartitionEOF)
                    {
                        Console.WriteLine($"Đã đến cuối chủ đề {consumeResult.Topic}, phân vùng {consumeResult.Partition}, offset {consumeResult.Offset}.");
                        continue;
                    }
                    if (consumeResult != null)
                    {
                        HandleMessage(consumeResult);
                    }
                    try
                    {
                        kafkaConsumer.StoreOffset(consumeResult);
                    }
                    catch (KafkaException ex)
                    {
                        Console.WriteLine($"Lỗi khi lưu offset: {ex.Error.Reason}");
                    }
                }
                catch (OperationCanceledException)
                {
                    // Bỏ qua nếu tác vụ bị hủy
                }
                catch (ConsumeException ex)
                {
                    Console.WriteLine($"Lỗi khi tiêu thụ: {ex.Error.Reason}");
                }
            }
        }, cancellationToken);

        return Task.CompletedTask;
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        kafkaConsumer?.Close();
        kafkaConsumer?.Dispose();
        return Task.CompletedTask;
    }
}

// Triển khai cụ thể 1
public class OrderProcessingConsumer : BaseKafkaConsumerService
{
    public OrderProcessingConsumer(IKafkaConsumerProvider provider, IOptions<KafkaConfig> options) : base(provider, options) { }

    protected override ConsumerConfig GetConsumerConfiguration()
    {
        return new ConsumerConfig
        {
            BootstrapServers = kafkaOptions.Value.BootstrapServers,
            AutoOffsetReset = AutoOffsetReset.Latest,
            GroupId = "order-processor",
            EnableAutoCommit = false
        };
    }

    public override List<string> GetSubscribedTopics()
    {
        return new List<string> { "orders" };
    }

    public override void HandleMessage(ConsumeResult<string, byte[]> message)
    {
        var messageBody = Encoding.UTF8.GetString(message.Message.Value);
        Console.WriteLine($"Đã xử lý tin nhắn mới: {messageBody}");
    }
}

// Triển khai cụ thể 2
public class OrderProcessingConsumerV2 : BaseKafkaConsumerService
{
    public OrderProcessingConsumerV2(IKafkaConsumerProvider provider, IOptions<KafkaConfig> options) : base(provider, options) { }

    protected override ConsumerConfig GetConsumerConfiguration()
    {
        return new ConsumerConfig
        {
            BootstrapServers = kafkaOptions.Value.BootstrapServers,
            AutoOffsetReset = AutoOffsetReset.Latest,
            GroupId = "order-processor-v2",
            EnableAutoCommit = false
        };
    }

    public override List<string> GetSubscribedTopics()
    {
        return new List<string> { "orders" };
    }

    public override void HandleMessage(ConsumeResult<string, byte[]> message)
    {
        var messageBody = Encoding.UTF8.GetString(message.Message.Value);
        Console.WriteLine($"Đã xử lý tin nhắn mới (phiên bản 2): {messageBody}");
    }
}

Đăng ký Dịch vụ Hosted Service theo Lô

Thay vì đăng ký từng dịch vụ một, chúng ta có thể quét các loại triển khai của lớp cơ sở và đăng ký chúng tự động.

public static class ServiceRegistrationExtensions
{
    public static IServiceCollection AddKafkaConsumers(this IServiceCollection services, IConfigurationSection kafkaConfigSection)
    {
        services.Configure<KafkaConfig>(kafkaConfigSection);
        services.AddSingleton<IKafkaConsumerProvider, KafkaConsumerProvider>();

        var baseConsumerType = typeof(BaseKafkaConsumerService);
        // Tìm tất cả các lớp kế thừa từ BaseKafkaConsumerService trong assembly hiện tại
        var consumerTypes = Assembly.GetEntryAssembly()
            .GetExportedTypes()
            .Where(t => t.IsSubclassOf(baseConsumerType) && !t.IsAbstract)
            .ToList();

        foreach (var consumerType in consumerTypes)
        {
            services.AddSingleton(typeof(IHostedService), consumerType);
        }

        return services;
    }

    public static IServiceCollection AddKafkaProducer(this IServiceCollection services, IConfigurationSection kafkaConfigSection)
    {
        services.Configure<KafkaConfig>(kafkaConfigSection);
        services.AddSingleton<IKafkaProducer>(sp =>
        {
            var config = sp.GetRequiredService<IOptions<KafkaConfig>>().Value;
            return KafkaProducer.CreateInstance(producerConfig =>
            {
                producerConfig.BootstrapServers = config.BootstrapServers;
                producerConfig.MessageTimeoutMs = 5000;
                producerConfig.EnableIdempotence = true;
            });
        });

        return services;
    }
}

Thẻ: C# .NET singleton IHostedService kafka

Đăng vào ngày 24 tháng 5 lúc 19:42