Bài viết này sẽ hướng dẫn cách đóng gói và sử dụng Canal trong ứng dụng .NET Core, bao gồm việc tạo service chạy nền, xử lý dữ liệu thay đổi từ cơ sở dữ liệu và cấu hình kết nối.
Service chạy nền
Để lắng nghe và xử lý các thay đổi từ Canal, chúng ta triển khai service nền bằng IHostedService. Service này chịu trách nhiệm kết nối, nhận message và gọi các handler tương ứng.
Mã mở rộng Canal
Lớp CanalExtend cung cấp phương thức mở rộng để đăng ký các thành phần Canal vào DI container:
public static class CanalExtensions
{
public static IServiceCollection AddCanal(this IServiceCollection services, IConfiguration configuration)
{
// Cấu hình options từ appsettings.json
services.Configure<CanalOptions>(configuration.GetSection("CanalOptions"));
services.AddTransient(p =>
{
return p.GetRequiredService<IOptionsMonitor<CanalOptions>>().CurrentValue;
});
// Đăng ký handler cho Product
services.AddTransient<CanalHandler<Product>, CanalProductHandler>();
// Đăng ký provider chính
return services.AddTransient<CanalProvider>();
}
}
Lớp cấu hình CanalOptions
public class CanalOptions
{
/// <summary> Địa chỉ host Canal </summary>
public string Host { get; set; }
/// <summary> Cổng kết nối </summary>
public int Port { get; set; }
/// <summary> ID client </summary>
public string ClientId { get; set; }
/// <summary> Subject mặc định </summary>
public string DefaultSubject { get; set; }
}
Lớp xử lý dữ liệu trừu tượng (CanalHandler<T>)
Đây là lớp cơ bản mà các handler cụ thể kế thừa, cung cấp các phương thức trừu tượng để xử lý Insert, Update, Delete và chuyển đổi dữ liệu:
public abstract class CanalHandler<T>
{
private RowChange _rowChange;
public RowChange CurrentRowChange => _rowChange;
public long CurrentMessageId { get; private set; }
public CanalProvider CanalProvider { get; private set; }
public abstract void Delete(T model);
public abstract void Insert(T model);
public abstract void Update(T model);
public abstract T MapModel(List<Column> columns);
public CanalHandler<T> SetRowChange(RowChange rowChange)
{
_rowChange = rowChange;
return this;
}
public void Process(long messageId, CanalProvider provider)
{
CurrentMessageId = messageId;
CanalProvider = provider;
foreach (var rowData in _rowChange.RowDatas)
{
// Dữ liệu sau khi thay đổi
List<Column> columns = rowData.AfterColumns.ToList();
T model = MapModel(columns);
switch (_rowChange.EventType)
{
case EventType.Delete:
Delete(model);
break;
case EventType.Insert:
Insert(model);
break;
case EventType.Update:
Update(model);
break;
}
}
}
}
Handler cụ thể cho Product
public class CanalProductHandler : CanalHandler<Product>
{
private readonly ILogger<CanalProductHandler> _logger;
public CanalProductHandler(ILogger<CanalProductHandler> logger)
{
_logger = logger;
}
public override Product MapModel(List<Column> columns)
{
var product = new Product();
foreach (var col in columns)
{
if (col.Name == nameof(Product.Id))
product.Id = long.Parse(col.Value);
else if (col.Name == nameof(Product.Name))
product.Name = col.Value;
else if (col.Name == nameof(Product.Price))
product.Price = Convert.ToDecimal(col.Value);
}
return product;
}
public override async void Insert(Product model)
{
_logger.LogInformation($"Insert: Id={model.Id}, Name={model.Name}, Price={model.Price}");
await CanalProvider.AcknowledgeAsync(CurrentMessageId);
}
public override async void Update(Product model)
{
_logger.LogInformation($"Update: Id={model.Id}, Name={model.Name}, Price={model.Price}");
await CanalProvider.AcknowledgeAsync(CurrentMessageId);
}
public override async void Delete(Product model)
{
_logger.LogInformation($"Delete: Id={model.Id}, Name={model.Name}, Price={model.Price}");
await CanalProvider.AcknowledgeAsync(CurrentMessageId);
}
}
Provider quản lý kết nối Canal
public class CanalProvider
{
private readonly CanalOptions _options;
private readonly ILogger _logger;
private SimpleCanalConnection _connection;
public CanalProvider(CanalOptions options, ILogger<SimpleCanalConnection> logger)
{
_options = options;
_logger = logger;
var simpleOptions = new SimpleCanalOptions(options.Host, options.Port, options.ClientId);
_connection = new SimpleCanalConnection(simpleOptions, _logger);
}
public async Task ConnectDefaultAsync()
{
await _connection.ConnectAsync();
await _connection.SubscribeAsync(_options.DefaultSubject);
}
public async Task ConnectAsync(string subject)
{
await _connection.ConnectAsync();
await _connection.SubscribeAsync(subject);
}
public async Task UnsubscribeDefaultAsync()
{
await _connection.UnSubscribeAsync(_options.DefaultSubject);
}
public async Task UnsubscribeAsync(string subject)
{
await _connection.UnSubscribeAsync(subject);
}
public async Task<Message> FetchDefaultMessageAsync()
{
return await _connection.GetWithoutAckAsync(1024);
}
public async Task<Message> FetchMessageAsync(int batchSize)
{
return await _connection.GetWithoutAckAsync(batchSize);
}
public async Task AcknowledgeAsync(long messageId)
{
await _connection.AckAsync(messageId);
}
public async Task RollbackAsync(long messageId)
{
await _connection.RollbackAsync(messageId);
}
public async Task ShutdownAsync()
{
await _connection.DisConnectAsync();
await _connection.DisposeAsync();
}
}
Service nền xử lý Canal
public class CanalWorkerService : IHostedService
{
private readonly CanalHandler<Product> _productHandler;
private readonly CanalProvider _provider;
public CanalWorkerService(CanalHandler<Product> productHandler, CanalProvider provider)
{
_productHandler = productHandler;
_provider = provider;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await _provider.ConnectDefaultAsync();
// Chạy vòng lặp bất đồng bộ không chặn luồng chính
_ = Task.Factory.StartNew(async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
var message = await _provider.FetchDefaultMessageAsync();
foreach (var entry in message.Entries)
{
// Bỏ qua các entry đánh dấu transaction
if (entry.EntryType == EntryType.TransactionBegin ||
entry.EntryType == EntryType.TransactionEnd)
continue;
var tableName = entry.Header.TableName;
if (tableName == "Products")
{
_productHandler
.SetRowChange(RowChange.Parser.ParseFrom(entry.StoreValue))
.Process(message.Id, _provider);
}
else if (tableName == "SpareProducts")
{
// Xử lý cho bảng khác
}
}
}
}, cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
public async Task StopAsync(CancellationToken cancellationToken)
{
await _provider.ShutdownAsync();
}
}
Cấu hình trong appsettings.json
{
"CanalOptions": {
"Host": "127.0.0.1",
"Port": 11111,
"ClientId": "101",
"DefaultSubject": "testhigh.Products,testhigh.SpareProducts"
}
}
Đăng ký trong Program.cs
// Đăng ký dịch vụ Canal builder.Services.AddCanal(builder.Configuration); // Thêm worker service chạy nền builder.Services.AddHostedService<CanalWorkerService>();
Kiểm thử
Sau khi khởi động ứng dụng, thực hiện thay đổi dữ liệu trong bảng Products (ví dụ: sửa tên sản phẩm "长虹电视" thành "Samsung TV"). Kết quả log cho thấy handler đã bắt và xử lý thành công thay đổi này.