Triển Khai Giao Dịch Phân Tán Trong Microservices .NET Sử Dụng CAP Và RabbitMQ

Giới thiệu về tính nhất quán dữ liệu trong kiến trúc phân tán

Khi hệ thống phát triển thành kiến trúc microservices, áp lực lên cơ sở dữ liệu tăng lên đáng kể. Để giải quyết vấn đề này, chúng ta thường thực hiện chia tách cơ sở dữ liệu theo chiều ngang (sharding) hoặc chiều dọc (tách bảng theo chức năng). Tuy nhiên, việc phân tách này dẫn đến thách thức lớn về tính nhất quán dữ liệu giữa các dịch vụ. Trong môi trường phân tán, giải pháp phổ biến để đảm bảo tính nhất quán cuối cùng (Eventual Consistency) là sử dụng mẫu giao dịch phân tán thông qua message queue.

Bài viết này sẽ hướng dẫn cách tích hợp DotNetCore.CAP cùng với RabbitMQMySQL để xử lý các giao dịch phân tán trong .NET Core.

Cài đặt hạ tầng Message Queue

Trước tiên, cần triển khai một instance RabbitMQ để làm trung gian truyền tải sự kiện. Có thể sử dụng Docker để khởi tạo nhanh chóng:

docker run -d --name mq-server -e RABBITMQ_DEFAULT_USER=cap_user -e RABBITMQ_DEFAULT_PASS=cap_pass -p 15672:15672 -p 5672:5672 rabbitmq:management

Sau khi container chạy, truy cập vào trang quản lý tại http://localhost:15672/ để xác minh trạng thái hoạt động của RabbitMQ.

Cài đặt các gói thư viện cần thiết

Thêm các package NuGet sau vào dự án .NET Core của bạn:

  • DotNetCore.CAP
  • DotNetCore.CAP.Dashboard
  • DotNetCore.CAP.MySql
  • DotNetCore.CAP.RabbitMQ

Lưu ý quan trọng: Để cơ chế Transactional Outbox hoạt động chính xác, bảng lưu trữ sự kiện của CAP cần nằm chung cơ sở dữ liệu với dữ liệu nghiệp vụ của dịch vụ đó.

Cấu hình dịch vụ sản xuất sự kiện (Order Service)

Trong file Program.cs, cấu hình CAP để kết nối với RabbitMQ và MySQL. Chúng ta sẽ thiết lập chính sách retry khi việc gửi消息 thất bại.

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddControllers();
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();

// Cấu hình DbContext
var connectionString = builder.Configuration.GetConnectionString("DefaultConnection");
builder.Services.AddDbContext<AppDbContext>(options => options.UseMySql(connectionString));

// Cấu hình CAP
builder.Services.AddCap(options =>
{
    options.FailedRetryCount = 5;
    options.FailedRetryInterval = 30;
    options.FailedThresholdCallback = failed =>
    {
        var logger = failed.ServiceProvider.GetService<ILogger<Program>>();
        logger?.LogError($"Sự kiện {failed.MessageType} thất bại sau {options.FailedRetryCount} lần重试.");
    };

    options.UseRabbitMQ(cfg =>
    {
        cfg.HostName = "localhost";
        cfg.Port = 5672;
        cfg.UserName = "cap_user";
        cfg.Password = "cap_pass";
    });

    options.UseMySql(connectionString);
    options.UseDashboard();
});

var app = builder.Build();

if (app.Environment.IsDevelopment())
{
    app.UseSwagger();
    app.UseSwaggerUI();
}

app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();
app.Run();

Tệp cấu hình appsettings.json sẽ chứa chuỗi kết nối:

{
  "ConnectionStrings": {
    "DefaultConnection": "server=localhost;port=3306;database=order_db;uid=root;pwd=secret;"
  },
  "Logging": {
    "LogLevel": {
      "Default": "Information"
    }
  }
}

Triển khai Controller gửi sự kiện

Tạo Controller để xử lý logic tạo đơn hàng và gửi sự kiện thông báo. Sử dụng ICapPublisher để publish message.

[Route("api/[controller]")]
[ApiController]
public class OrderController : ControllerBase
{
    private readonly ICapPublisher _eventPublisher;
    private readonly AppDbContext _dbContext;
    private const string TopicOrderCreated = "services.order.created";

    public OrderController(ICapPublisher publisher, AppDbContext context)
    {
        _eventPublisher = publisher;
        _dbContext = context;
    }

    [HttpGet("create/{id}")]
    public async Task<IActionResult> CreateOrderAsync(long id)
    {
        var order = new Order { Id = id, Status = OrderStatus.Pending };
        
        // Gửi sự kiện thông thường (không kèm transaction DB)
        await _eventPublisher.PublishAsync(TopicOrderCreated, order);
        
        return Ok(order);
    }
}

Cấu hình dịch vụ tiêu thụ sự kiện (Payment Service)

Dịch vụ thanh toán cũng cần cấu hình CAP tương tự để lắng nghe sự kiện từ RabbitMQ. Đảm bảo chuỗi kết nối数据库 trỏ đúng nơi lưu trữ bảng cap.received.

// Cấu hình trong Program.cs của Payment Service
builder.Services.AddCap(options =>
{
    options.FailedRetryCount = 5;
    options.FailedRetryInterval = 30;
    // ... cấu hình RabbitMQ và MySQL tương tự như trên
    options.UseRabbitMQ("localhost");
    options.UseMySql(connectionString);
    options.UseDashboard();
});

Xử lý sự kiện trong Consumer

Sử dụng attribute [CapSubscribe] để đăng ký method lắng nghe topic. Attribute [NonAction] là bắt buộc để tránh conflict với routing của ASP.NET Core.

[Route("api/[controller]")]
[ApiController]
public class PaymentController : ControllerBase
{
    private readonly ILogger<PaymentController> _logger;

    public PaymentController(ILogger<PaymentController> logger)
    {
        _logger = logger;
    }

    [NonAction]
    [CapSubscribe("services.order.created", Group = "payment.group.v1")]
    public void HandleOrderCreated(Order order)
    {
        // Xử lý logic thanh toán
        order.Status = OrderStatus.Paid;
        _logger.LogInformation($"Đã xử lý thanh toán cho đơn hàng {order.Id}");
    }

    [NonAction]
    [CapSubscribe("services.order.created", Group = "analytics.group.v1")]
    public void HandleOrderForAnalytics(Order order)
    {
        // Một topic có thể có nhiều consumer group khác nhau
        _logger.LogInformation($"Ghi nhận dữ liệu phân tích cho đơn hàng {order.Id}");
    }
}

Lưu ý rằng queue trong RabbitMQ sẽ được tạo tự động khi consumer khởi động lần đầu. Các bảng cap.publishedcap.received cũng sẽ được CAP tự động tạo trong database để lưu trữ trạng thái sự kiện.

Thực hiện Giao Dịch Phân Tán (Transactional Outbox)

Để đảm bảo dữ liệu được lưu vào DB và sự kiện được gửi đi cùng lúc (atomic), sử dụng phương thức mở rộng BeginTransaction của CAP. Điều này yêu cầu bảng CAP và bảng nghiệp vụ phải cùng một database connection.

[HttpGet("create-transaction/{id}")]
public async Task<IActionResult> CreateOrderTransactionalAsync(long id)
{
    var order = new Order { Id = id, Status = OrderStatus.Pending };
    const string TopicTrans = "services.order.trans.created";

    // Bắt đầu transaction bao gồm cả việc lưu DB và gửi message
    using (var transaction = _dbContext.Database.BeginTransaction(_eventPublisher, autoCommit: false))
    {
        _dbContext.Orders.Add(order);
        await _dbContext.SaveChangesAsync();
        
        // Message sẽ chỉ được gửi khi transaction commit thành công
        await _eventPublisher.PublishAsync(TopicTrans, order);
        
        transaction.Commit();
    }

    return Ok("Đơn hàng đã được lưu và sự kiện đã được lên lịch gửi");
}

Trong dịch vụ Payment, tạo thêm method subscribe cho topic giao dịch này:

[NonAction]
[CapSubscribe("services.order.trans.created", Group = "payment.trans.group")]
public void HandleTransactionalOrder(Order order)
{
    order.Status = OrderStatus.Paid;
    _logger.LogWarning($"Xử lý giao dịch phân tán thành công cho đơn hàng {order.Id}");
}

Truyền tải Metadata qua Header

CAP hỗ trợ truyền thêm thông tin metadata qua header của message. Điều này hữu ích cho việc tracing hoặc truyền thông tin user context.

Phía Producer:

var headers = new Dictionary<string, string>
{
    { "x-user-id", "12345" },
    { "x-request-source", "web-client" }
};

await _eventPublisher.PublishAsync(TopicOrderCreated, order, headers);

Phía Consumer:

[NonAction]
[CapSubscribe("services.order.created")]
public void HandleOrderWithHeader(Order order, [FromCap] CapHeader headers)
{
    var userId = headers["x-user-id"];
    _logger.LogInformation($"Nhận đơn hàng từ user {userId}");
}

Truy cập vào dashboard của CAP tại https://localhost:5001/cap (tùy port cấu hình) để theo dõi trạng thái của các message đã gửi và nhận.

Thẻ: dotnet-core cap rabbitmq distributed-transaction mysql

Đăng vào ngày 13 tháng 6 lúc 17:15