Đóng gói thư viện Canal trong .NET Core

Bài viết này sẽ hướng dẫn cách đóng gói và sử dụng Canal trong ứng dụng .NET Core, bao gồm việc tạo service chạy nền, xử lý dữ liệu thay đổi từ cơ sở dữ liệu và cấu hình kết nối.

Service chạy nền

Để lắng nghe và xử lý các thay đổi từ Canal, chúng ta triển khai service nền bằng IHostedService. Service này chịu trách nhiệm kết nối, nhận message và gọi các handler tương ứng.

Mã mở rộng Canal

Lớp CanalExtend cung cấp phương thức mở rộng để đăng ký các thành phần Canal vào DI container:

public static class CanalExtensions
{
    public static IServiceCollection AddCanal(this IServiceCollection services, IConfiguration configuration)
    {
        // Cấu hình options từ appsettings.json
        services.Configure<CanalOptions>(configuration.GetSection("CanalOptions"));
        
        services.AddTransient(p =>
        {
            return p.GetRequiredService<IOptionsMonitor<CanalOptions>>().CurrentValue;
        });
        
        // Đăng ký handler cho Product
        services.AddTransient<CanalHandler<Product>, CanalProductHandler>();
        
        // Đăng ký provider chính
        return services.AddTransient<CanalProvider>();
    }
}

Lớp cấu hình CanalOptions

public class CanalOptions
{
    /// <summary> Địa chỉ host Canal </summary>
    public string Host { get; set; }

    /// <summary> Cổng kết nối </summary>
    public int Port { get; set; }

    /// <summary> ID client </summary>
    public string ClientId { get; set; }

    /// <summary> Subject mặc định </summary>
    public string DefaultSubject { get; set; }
}

Lớp xử lý dữ liệu trừu tượng (CanalHandler<T>)

Đây là lớp cơ bản mà các handler cụ thể kế thừa, cung cấp các phương thức trừu tượng để xử lý Insert, Update, Delete và chuyển đổi dữ liệu:

public abstract class CanalHandler<T>
{
    private RowChange _rowChange;
    public RowChange CurrentRowChange => _rowChange;

    public long CurrentMessageId { get; private set; }
    public CanalProvider CanalProvider { get; private set; }

    public abstract void Delete(T model);
    public abstract void Insert(T model);
    public abstract void Update(T model);
    public abstract T MapModel(List<Column> columns);

    public CanalHandler<T> SetRowChange(RowChange rowChange)
    {
        _rowChange = rowChange;
        return this;
    }

    public void Process(long messageId, CanalProvider provider)
    {
        CurrentMessageId = messageId;
        CanalProvider = provider;

        foreach (var rowData in _rowChange.RowDatas)
        {
            // Dữ liệu sau khi thay đổi
            List<Column> columns = rowData.AfterColumns.ToList();
            T model = MapModel(columns);

            switch (_rowChange.EventType)
            {
                case EventType.Delete:
                    Delete(model);
                    break;
                case EventType.Insert:
                    Insert(model);
                    break;
                case EventType.Update:
                    Update(model);
                    break;
            }
        }
    }
}

Handler cụ thể cho Product

public class CanalProductHandler : CanalHandler<Product>
{
    private readonly ILogger<CanalProductHandler> _logger;

    public CanalProductHandler(ILogger<CanalProductHandler> logger)
    {
        _logger = logger;
    }

    public override Product MapModel(List<Column> columns)
    {
        var product = new Product();
        foreach (var col in columns)
        {
            if (col.Name == nameof(Product.Id))
                product.Id = long.Parse(col.Value);
            else if (col.Name == nameof(Product.Name))
                product.Name = col.Value;
            else if (col.Name == nameof(Product.Price))
                product.Price = Convert.ToDecimal(col.Value);
        }
        return product;
    }

    public override async void Insert(Product model)
    {
        _logger.LogInformation($"Insert: Id={model.Id}, Name={model.Name}, Price={model.Price}");
        await CanalProvider.AcknowledgeAsync(CurrentMessageId);
    }

    public override async void Update(Product model)
    {
        _logger.LogInformation($"Update: Id={model.Id}, Name={model.Name}, Price={model.Price}");
        await CanalProvider.AcknowledgeAsync(CurrentMessageId);
    }

    public override async void Delete(Product model)
    {
        _logger.LogInformation($"Delete: Id={model.Id}, Name={model.Name}, Price={model.Price}");
        await CanalProvider.AcknowledgeAsync(CurrentMessageId);
    }
}

Provider quản lý kết nối Canal

public class CanalProvider
{
    private readonly CanalOptions _options;
    private readonly ILogger _logger;
    private SimpleCanalConnection _connection;

    public CanalProvider(CanalOptions options, ILogger<SimpleCanalConnection> logger)
    {
        _options = options;
        _logger = logger;
        var simpleOptions = new SimpleCanalOptions(options.Host, options.Port, options.ClientId);
        _connection = new SimpleCanalConnection(simpleOptions, _logger);
    }

    public async Task ConnectDefaultAsync()
    {
        await _connection.ConnectAsync();
        await _connection.SubscribeAsync(_options.DefaultSubject);
    }

    public async Task ConnectAsync(string subject)
    {
        await _connection.ConnectAsync();
        await _connection.SubscribeAsync(subject);
    }

    public async Task UnsubscribeDefaultAsync()
    {
        await _connection.UnSubscribeAsync(_options.DefaultSubject);
    }

    public async Task UnsubscribeAsync(string subject)
    {
        await _connection.UnSubscribeAsync(subject);
    }

    public async Task<Message> FetchDefaultMessageAsync()
    {
        return await _connection.GetWithoutAckAsync(1024);
    }

    public async Task<Message> FetchMessageAsync(int batchSize)
    {
        return await _connection.GetWithoutAckAsync(batchSize);
    }

    public async Task AcknowledgeAsync(long messageId)
    {
        await _connection.AckAsync(messageId);
    }

    public async Task RollbackAsync(long messageId)
    {
        await _connection.RollbackAsync(messageId);
    }

    public async Task ShutdownAsync()
    {
        await _connection.DisConnectAsync();
        await _connection.DisposeAsync();
    }
}

Service nền xử lý Canal

public class CanalWorkerService : IHostedService
{
    private readonly CanalHandler<Product> _productHandler;
    private readonly CanalProvider _provider;

    public CanalWorkerService(CanalHandler<Product> productHandler, CanalProvider provider)
    {
        _productHandler = productHandler;
        _provider = provider;
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        await _provider.ConnectDefaultAsync();

        // Chạy vòng lặp bất đồng bộ không chặn luồng chính
        _ = Task.Factory.StartNew(async () =>
        {
            while (!cancellationToken.IsCancellationRequested)
            {
                var message = await _provider.FetchDefaultMessageAsync();
                
                foreach (var entry in message.Entries)
                {
                    // Bỏ qua các entry đánh dấu transaction
                    if (entry.EntryType == EntryType.TransactionBegin || 
                        entry.EntryType == EntryType.TransactionEnd)
                        continue;

                    var tableName = entry.Header.TableName;
                    if (tableName == "Products")
                    {
                        _productHandler
                            .SetRowChange(RowChange.Parser.ParseFrom(entry.StoreValue))
                            .Process(message.Id, _provider);
                    }
                    else if (tableName == "SpareProducts")
                    {
                        // Xử lý cho bảng khác
                    }
                }
            }
        }, cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);
    }

    public async Task StopAsync(CancellationToken cancellationToken)
    {
        await _provider.ShutdownAsync();
    }
}

Cấu hình trong appsettings.json

{
  "CanalOptions": {
    "Host": "127.0.0.1",
    "Port": 11111,
    "ClientId": "101",
    "DefaultSubject": "testhigh.Products,testhigh.SpareProducts"
  }
}

Đăng ký trong Program.cs

// Đăng ký dịch vụ Canal
builder.Services.AddCanal(builder.Configuration);

// Thêm worker service chạy nền
builder.Services.AddHostedService<CanalWorkerService>();

Kiểm thử

Sau khi khởi động ứng dụng, thực hiện thay đổi dữ liệu trong bảng Products (ví dụ: sửa tên sản phẩm "长虹电视" thành "Samsung TV"). Kết quả log cho thấy handler đã bắt và xử lý thành công thay đổi này.

Thẻ: .net core canal MySQL binlog IHostedService Dependency Injection

Đăng vào ngày 25 tháng 6 lúc 16:21