RabbitMQ - Hệ Thống Giao Tiếp Tin Nhắn

I. Kiến Thức Cơ Bản

1. RabbitMQ là gì

RabbitMQ được phát hành vào năm 2007, là một hệ thống hoàn thiện dựa trên AMQP (Giao thức Hàng đợi Tin nhắn Nâng cao), viết tắt là MQ. Toàn danh là Message Queue (Hàng đợi Tin nhắn). MQ là một phương pháp giao tiếp giữa ứng dụng và ứng dụng, được phát triển bằng ngôn ngữ Erlang (ngôn ngữ chuyên dụng cho xử lý dữ liệu lớn và đồng thời cao). Đây là một hệ thống tin nhắn doanh nghiệp có thể tái sử dụng và là một trong những middleware tin nhắn phổ biến nhất hiện nay, có độ tin cậy cao, định tuyến linh hoạt, đơn giản hóa cụm, hàng đợi có độ sẵn sàng cao, hỗ trợ nhiều giao thức, giao diện quản lý, cơ chế theo dõi và cơ chế plugin.

2. Tin nhắn và Hàng đợi là gì
  1. Tin nhắn là dữ liệu, dữ liệu được thêm, xóa, sửa, tìm kiếm. Ví dụ, trong hệ thống quản lý nhân viên, dữ liệu được thêm, xóa, sửa, tìm kiếm.

  2. Hàng đợi là một đầu vào dữ liệu và một đầu ra dữ liệu, ví dụ như cấu trúc dữ liệu Queue trong C#.

3. Hàng đợi Tin nhắn là gì
  1. Hàng đợi tin nhắn: một đầu vào tin nhắn, một đầu ra tin nhắn.

  2. RabbitMQ là một thành phần thực hiện khái niệm hàng đợi tin nhắn. Theo tư duy hướng đối tượng, hàng đợi tin nhắn là lớp, còn RabbitMQ là một thể hiện. Tất nhiên không chỉ có RabbitMQ, ví dụ như ActiveMQ, RocketMQ, Kafka, bao gồm cả Redis cũng có thể thực hiện hàng đợi tin nhắn.

4. Nơi nào sử dụng RabbitMQ
  1. Trong kiến trúc đơn thể phổ biến, quy trình chính là người dùng thực hiện thao tác UI gửi yêu cầu HTTP > máy chủ xử lý > sau đó máy chủ tương tác trực tiếp với cơ sở dữ liệu và phản hồi đồng bộ cho người dùng.

  2. Trong kiến trúc vi dịch vụ, ví dụ như hệ thống quản lý nhân viên trong hình, giao tiếp giữa UI và vi dịch vụ chủ yếu thông qua HTTP hoặc gRPC đồng bộ.

Phân tích vấn đề

Trong hai trường hợp trên, chúng ta thấy rằng trong yêu cầu UI đều là thao tác đồng bộ. Kiến trúc thứ hai mặc dù chia tách toàn bộ dịch vụ theo nghiệp vụ thành các vi dịch vụ khác nhau và có cơ sở dữ liệu riêng tương ứng, nhưng vấn đề vẫn chưa được giải quyết khi người dùng giao tiếp với vi dịch vụ, ví dụ như khả năng chịu tải của cơ sở dữ liệu chỉ có thể xử lý 100.000 yêu cầu, nếu gặp tình huống đồng thời cao, UI gửi 500.000 yêu cầu, cơ sở dữ liệu sẽ không thể chịu được, dẫn đến các vấn đề sau.

  1. Yêu cầu đồng thời cao dẫn đến hệ thống giảm hiệu suất, phản hồi chậm, đồng thời rủi ro chịu tải của cơ sở dữ liệu tăng lên.

  2. Khả năng mở rộng không mạnh, tương tác của UI với nghiệp vụ phụ thuộc lớn, dẫn đến trải nghiệm người dùng giảm.

  3. Nếu lưu lượng truy cập đột ngột tăng vọt, máy chủ có thể bị treo.

Giải pháp

  • Để giải quyết vấn đề điểm nghẽn hiệu suất, chúng ta cần chuyển đổi giao tiếp đồng bộ thành giao tiếp không đồng bộ. Do đó, chúng ta sử dụng hàng đợi tin nhắn, người dùng thực hiện thao tác trực tiếp trong UI và ghi vào RabbitMQ, sau đó trả về, các thao tác nghiệp vụ còn lại được hoàn thành bởi hàng đợi tin nhắn và các vi dịch vụ tương ứng.

Ưu điểm của RabbitMQ

  1. Xử lý không đồng bộ, phản hồi nhanh, tăng khả năng chịu tải của cơ sở dữ liệu (máy chủ).

  2. Giảm đỉnh, có thể phân tán lưu lượng cao đến các khoảng thời gian khác nhau để xử lý.

  3. Giải coupling (khả năng mở rộng mạnh hơn), cho phép UI và nghiệp vụ phát triển độc lập.

  4. Độ sẵn sàng cao, nếu bộ xử lý gặp sự cố, nó không ảnh hưởng đến các bộ xử lý khác.

Nhược điểm của RabbitMQ

  1. Tăng độ phức tạp của hệ thống, không thuận tiện để gỡ lỗi và phát triển, trước khi sử dụng RabbitMQ, phía trước tương tác trực tiếp với dịch vụ, bây giờ thêm một lớp.

  2. Giảm độ tức thì, trong một mức độ nào đó, nó cải thiện trải nghiệm người dùng, nhưng cũng giảm trải nghiệm người dùng, nhưng không thể tránh khỏi, phải chấp nhận ưu và nhược điểm.

  3. Phụ thuộc nhiều hơn vào hàng đợi tin nhắn.

5. Các khái niệm cấu thành RabbitMQ
  1. ConnectionFactory là nhà máy sản xuất Connection.

  2. Connection là kết nối socket của RabbitMQ, nó đóng gói phần logic liên quan đến giao thức socket.

  3. Channel là giao diện quan trọng nhất để chúng ta tương tác với RabbitMQ, hầu hết các thao tác nghiệp vụ của chúng ta được thực hiện trong giao diện Channel này, bao gồm định nghĩa Queue, định nghĩa Exchange, liên kết Queue với Exchange, xuất bản tin nhắn, v.v.

  4. Exchange (Bộ chuyển đổi) Chúng ta thường nghĩ rằng nhà sản xuất gửi tin nhắn đến Queue, nhưng thực tế, nhà sản xuất gửi tin nhắn đến Exchange, và Exchange định tuyến tin nhắn đến một hoặc nhiều Queue (hoặc bỏ qua). Trong RabbitMQ, Exchange có tổng cộng 4 chiến lược, lần lượt là: fanout (quạt), direct (trực tiếp), topic (chủ đề), headers (đầu).

II. Cách triển khai RabbitMQ

1. Cài đặt môi trường RabbitMQ
  1. Tải xuống RabbitMQ

  2. Chạy môi trường erlang

  3. Sau khi cài đặt xong, tải plugin quản lý RabbitMQ

rabbitmq-plugins enable rabbitmq_management

  1. Cài đặt thành công và truy cập trang quản trị RabbitMQ http://localhost:15672
2. Tạo hệ thống nghiệp vụ
  1. Tạo dịch vụ chấm công, dịch vụ xin nghỉ phép, dịch vụ tính lương, dịch vụ email, dịch vụ tin nhắn SMS làm vai trò người tiêu thụ.

  2. Tạo trang web quản lý nhân viên để mô phỏng lời gọi phía trước, chủ yếu đóng vai trò là nhà sản xuất.

  3. Trong trang web quản lý nhân viên và mỗi dịch vụ mô phỏng, giới thiệu RabbitMQ.Client qua nuget.

  4. Trong trang web quản lý nhân viên, tạo bộ điều khiển mô phỏng thêm chấm công và thêm mã nhà sản xuất

 // Tạo kết nối
 using (var connection = factory.CreateConnection())
 {
     // Tạo kênh
     var channel = connection.CreateModel();
     // Định nghĩa hàng đợi
     channel.QueueDeclare("ThemChamCong", false, false, false, null);

     string json = JsonConvert.SerializeObject(dtoChamCong);

     // Tạo đối tượng nội dung
     var properties = channel.CreateBasicProperties();
     // Gửi tin nhắn
     channel.BasicPublish(exchange: "", routingKey: "ThemChamCong", basicProperties: properties, body: Encoding.UTF8.GetBytes(json));
 }


  1. Trong dịch vụ chấm công vi dịch vụ, tạo giao diện và thêm mã người tiêu thụ trong giao diện
var connection = factory.CreateConnection();
var channel = connection.CreateModel();   
// Tạo sự kiện người tiêu thụ
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var body = ea.Body;
    // 1. Mã logic, thêm vào cơ sở dữ liệu
    var message = Encoding.UTF8.GetString(body.ToArray());
    object json = JsonConvert.DeserializeObject(message);
    Console.WriteLine(" [x] Tạo thông tin chấm công {0}", message);
};
// Đặt thuộc tính người tiêu thụ
// p1. Lắng nghe hàng đợi p2. Xác nhận tin nhắn ACK p3. Gán giá trị cho thể hiện người tiêu thụ
channel.BasicConsume(queue: "ThemChamCong", autoAck: false, consumer: consumer);


III. Exchange và Phân tích Ví dụ

1. Fanout Exchange (Bộ chuyển đổi quạt)

Quy tắc định tuyến của Exchange loại fanout rất đơn giản, hoạt động tương tự như đa phát một-nhiều, nó sẽ định tuyến tất cả các tin nhắn được gửi đến Exchange này đến tất cả các Queue được liên kết với nó.

  1. Nhà sản xuất một Exchange tương ứng với nhiều Queue, hoặc không khai báo Queue.

  2. Người tiêu thụ định nghĩa Exchange, nếu nhà sản xuất đã định nghĩa Queue, thì phải liên kết exchange và queue, nếu không định nghĩa hàng đợi, thì người tiêu thụ tự khai báo một Queue ngẫu nhiên để nhận và tiêu thụ tin nhắn.

Ví dụ nghiệp vụ

Khi có nhân viên cần xin nghỉ phép, trong hệ thống quản lý nhân viên nộp đơn xin nghỉ phép, nhưng theo quy định của công ty, nhân viên thông thường xin nghỉ phép cần gửi tin nhắn SMS đến cấp trên của họ, đối với kịch bản nghiệp vụ này, chúng ta cần gọi dịch vụ xin nghỉ phép đồng thời gửi tin nhắn, lúc này cần hai người tiêu thụ (dịch vụ xin nghỉ phép, dịch vụ tin nhắn) để tiêu thụ cùng một tin nhắn, thực chất là ghi một tin nhắn vào RabbitMQ có thể được nhiều người tiêu thụ nhận, vì vậy có thể sử dụng Bộ chuyển đổi quạt, một nhà sản xuất, nhiều người tiêu thụ.

Mô phỏng nhà sản xuất sử dụng bộ điều khiển để gọi

[HttpPost]
public IEnumerable<bool> TaoNghiPhep(TaoNghiPhepDto taoNghiPhepDto)
{
    var factory = new ConnectionFactory()
    {
        HostName = "192.168.0.106",
        Port = 5672,
        Password = "guest",
        UserName = "guest",
        VirtualHost = "/"
    };
    using (var connection = factory.CreateConnection())
    {
        var channel = connection.CreateModel();
        // Định nghĩa bộ chuyển đổi
        channel.ExchangeDeclare(exchange: "NghiPhep_fanout", type: "fanout");
        string productJson = JsonConvert.SerializeObject(taoNghiPhepDto);
        var body = Encoding.UTF8.GetBytes(productJson);
        var properties = channel.CreateBasicProperties();
        // Đặt tin nhắn bền vững
        properties.Persistent = true;

        channel.BasicPublish(exchange: "NghiPhep_fanout", routingKey: "",  basicProperties: properties, body: body);
    }

}


Người tiêu thụ thực hiện giao diện IHostedService để tạo một máy chủ lắng nghe

public class DichVuRabbitmq : IHostedService
{
	  public Task StartAsync(CancellationToken cancellationToken)
      {
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                Port = 5672,
                Password = "guest",
                UserName = "guest",
                VirtualHost = "/"
            };
           	var connection = factory.CreateConnection();
            var channel = connection.CreateModel();

            // 1. Định nghĩa bộ chuyển đổi
            channel.ExchangeDeclare(exchange: "NghiPhep_fanout", type: ExchangeType.Fanout);
            // Định nghĩa hàng đợi ngẫu nhiên
            var tenHangDoi = channel.QueueDeclare().QueueName;	   
            // Liên kết hàng đợi và bộ chuyển đổi
            channel.QueueBind(tenHangDoi, "NghiPhep_fanout", routingKey: "");

           var consumer = new EventingBasicConsumer(channel);
           consumer.Received += (model, ea) =>
           {
               Console.WriteLine($"model:{model}");
               var body = ea.Body;
               // 1. Mã nghiệp vụ
               var message = Encoding.UTF8.GetString(body.ToArray());
               Console.WriteLine(" [x] Tạo đơn xin nghỉ phép {0}", message);

               // 1. Khuyết điểm của cơ chế xác nhận tự động, tin nhắn có được thêm vào cơ sở dữ liệu một cách bình thường không, vì vậy cần sử dụng xác nhận thủ công
               channel.BasicAck(ea.DeliveryTag, true);
          };
          
          // Qos (ngăn chặn nhiều người tiêu thụ, năng lực không đồng nhất, gây ra vấn đề chất lượng hệ thống.
          // Mỗi lần một người tiêu thụ chỉ tiêu thụ thành công một)
          channel.BasicQos(0, 1, false);
           // Xác nhận tin nhắn (ngăn chặn việc tiêu thụ tin nhắn thất bại)
          channel.BasicConsume(queue: tenHangDoi ,autoAck: false, consumer: consumer);
      }
      
      public Task StopAsync(CancellationToken cancellationToken)
      {
         // 1. Đóng kết nối rabbitmq
         throw new NotImplementedException();
      }
}



2. Direct Exchange (Bộ chuyển đổi trực tiếp)

Bộ chuyển đổi trực tiếp, hoạt động tương tự như phát sóng một-một, Exchange sẽ gửi tin nhắn đến Queue hoàn toàn khớp với ROUTING_KEY, nhược điểm là không thể thực hiện nhiều nhà sản xuất cho một người tiêu thụ.

  1. Nhà sản xuất một Exchange tương ứng với một routingKey được liên kết, cũng có thể khai báo hàng đợi và liên kết, sau đó gửi tin nhắn đến hàng đợi được chỉ định.

  2. Người tiêu thụ cần định nghĩa Exchange và routingKey, nếu nhà sản xuất đã khai báo và liên kết hàng đợi, thì người tiêu thụ phải liên kết Queue được chỉ định bởi nhà sản xuất để nhận tin nhắn, nếu không chỉ định Queue, thì người tiêu thụ cần tự khai báo một Queue ngẫu nhiên rồi liên kết để nhận tin nhắn.

Khi hệ thống quản lý nhân viên của chúng ta cần tính lương và gửi kết quả cho nhân viên bằng tin nhắn, lúc này chúng ta không phù hợp để sử dụng "Bộ chuyển đổi quạt", vì nếu là bạn, bạn cũng không muốn cả công ty biết lương của mình, đúng không? Lúc này cần một kịch bản một-một được tùy chỉnh, thì hãy sử dụng Bộ chuyển đổi trực tiếp để gửi người tiêu thụ được chỉ định theo routingKey khi sản xuất tin nhắn.

Mô phỏng nhà sản xuất sử dụng bộ điều khiển để gọi

public IEnumerable<bool> GuiTinhLuong(TinhLuongDto tinhLuongDto)
{
 var factory = new ConnectionFactory()
 {
     HostName = "192.168.0.106",
     Port = 5672,
     Password = "admin",
     UserName = "admin",
     VirtualHost = "/"
 };

using (var connection = factory.CreateConnection())
 {
     var channel = connection.CreateModel();
     //2. Định nghĩa bộ chuyển đổi
     channel.ExchangeDeclare(exchange: "TinhLuong_direct", type: "direct");

     string tinhLuongDtoJson = JsonConvert.SerializeObject(tinhLuongDto);
     var body = Encoding.UTF8.GetBytes(tinhLuongDtoJson);

     //3. Gửi tin nhắn
     var properties = channel.CreateBasicProperties();
     properties.Persistent = true; // Đặt tin nhắn bền vững
     //p1 Chỉ định bộ chuyển đổi
     //p2 routingKey 
     channel.BasicPublish(exchange: "TinhLuong_direct", routingKey: "nhan-vien-sms", basicProperties: properties, body: body);
 }
}

Người tiêu thụ thực hiện giao diện IHostedService để tạo một máy chủ lắng nghe

public class DichVuRabbitmq : IHostedService
{
      public Task StartAsync(CancellationToken cancellationToken)
      {
         var factory = new ConnectionFactory()
         {
             HostName = "localhost",
             Port = 5672,
             Password = "guest",
             UserName = "guest",
             VirtualHost = "/"
         };
           
	var connection = factory.CreateConnection();
	var channel = connection.CreateModel();

	// 1. Định nghĩa bộ chuyển đổi
	channel.ExchangeDeclare(exchange: "TinhLuong_direct", type: ExchangeType.Direct);

	// 2. Định nghĩa hàng đợi ngẫu nhiên
	var tenHangDoi = channel.QueueDeclare().QueueName;

	// 3. Liên kết hàng đợi với bộ chuyển đổi
	channel.QueueBind(tenHangDoi, "TinhLuong_direct", routingKey: "nhan-vien-sms");

	var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            Console.WriteLine($"model:{model}");
            var body = ea.Body;
            // 1. Mã nghiệp vụ
            var message = Encoding.UTF8.GetString(body.ToArray());
            Console.WriteLine(" [x] Gửi tin nhắn {0}", message);
            // 1. Tin nhắn có được thêm vào cơ sở dữ liệu một cách bình thường không, vì vậy cần sử dụng xác nhận thủ công
            channel.BasicAck(ea.DeliveryTag, true);
        };
            // 3. Tiêu thụ tin nhắn
            channel.BasicQos(0, 1, false); // Qos (ngăn chặn nhiều người tiêu thụ, năng lực không đồng nhất, gây ra vấn đề chất lượng hệ thống.
            // autoAck đặt thành false không thực hiện xác nhận tự động                     
            channel.BasicConsume(queue: tenHangDoi, autoAck: false, consumer: consumer);
      }
      
      public Task StopAsync(CancellationToken cancellationToken)
      {
         // 1. Đóng kết nối rabbitmq
         throw new NotImplementedException();
      }
}

3. Topic Exchange (Bộ chuyển đổi chủ đề)

Khi liên kết Exchange với hàng đợi, cần chỉ định Key; Key có thể có quy tắc của riêng nó; Key có thể có ký tự đại diện; * hoặc #, * chỉ khớp một từ, # khớp nhiều từ trở lên, trên cơ sở Direct thêm khớp mờ; nhiều nhà sản xuất một người tiêu thụ, có thể nhiều-đối-nhiều, cũng có thể nhiều-đối-1, trong dự án thực, sử dụng bộ chuyển đổi chủ đề. Có thể đáp ứng tất cả các kịch bản.

  1. Nhà sản xuất định nghĩa Exchange, sau đó liên kết với các routingKey khác nhau.

  2. Người tiêu thụ định nghĩa Exchange, nếu nhà sản xuất đã định nghĩa Queue, thì phải liên kết exchange, queue và routingKey, nếu không định nghĩa hàng đợi, thì người tiêu thụ tự khai báo một Queue ngẫu nhiên để nhận và tiêu thụ tin nhắn.

  3. Khớp mờ routingKey của người tiêu thụ, khi nhà sản xuất gửi tin nhắn, routingKey được định nghĩa bắt đầu bằng sms., * chỉ có thể khớp routingKey là một cấp, ví dụ (sms.A) hoặc (sms.B) gửi tin nhắn, # có thể khớp routingKey là một cấp trở lên, ví dụ (sms.A) hoặc (sms.A.QWE.IOP)

Vào cuối tháng, chúng ta cần gửi thông tin chấm công bất thường, thông tin tính lương, thông tin xin nghỉ phép của nhân viên cho nhân viên xem qua email, chúng ta biết đây là một kịch bản nhiều nhà sản xuất, một người tiêu thụ điển hình, thông tin chấm công bất thường, thông tin tính lương, thông tin xin nghỉ phép cần sản xuất tin nhắn và gửi đến RabbitMQ, sau đó cung cấp cho nhân viên của chúng ta tiêu thụ.

Mô phỏng 3 nhà sản xuất tương ứng: thông tin chấm công bất thường, thông tin tính lương, thông tin xin nghỉ phép

var factory = new ConnectionFactory()
 {
     HostName = "192.168.0.106",
     Port = 5672,
     Password = "admin",
     UserName = "admin",
     VirtualHost = "/"
 };
 // Nhà sản xuất tính lương
public IEnumerable<bool> GuiTinhLuong(TinhLuongDto tinhLuongDto)
{
using (var connection = factory.CreateConnection())
 {
     var channel = connection.CreateModel();
     //2. Định nghĩa bộ chuyển đổi chủ đề
     channel.ExchangeDeclare(exchange: "email_topic", type: "topic");

     string tinhLuongDtoJson = JsonConvert.SerializeObject(tinhLuongDto);
     var body = Encoding.UTF8.GetBytes(tinhLuongDtoJson);

     //3. Gửi tin nhắn
     var properties = channel.CreateBasicProperties();
     properties.Persistent = true; // Đặt tin nhắn bền vững
     //p1 Chỉ định bộ chuyển đổi
     //p2 routingKey 
     channel.BasicPublish(exchange: "email_topic", routingKey: "email.TinhLuong", basicProperties: properties, body: body);
 }
}

// Nhà sản xuất chấm công
public IEnumerable<bool> GuiChamCongBinhThuong(ChamCongBinhThuongDto chamCongBinhThuong)
{
using (var connection = factory.CreateConnection())
 {
     var channel = connection.CreateModel();
     //2. Định nghĩa bộ chuyển đổi chủ đề
     channel.ExchangeDeclare(exchange: "email_topic", type: "topic");

     string chamCongBinhThuongDtoJson = JsonConvert.SerializeObject(chamCongBinhThuong);
     var body = Encoding.UTF8.GetBytes(chamCongBinhThuongDtoJson);

     //3. Gửi tin nhắn
     var properties = channel.CreateBasicProperties();
     properties.Persistent = true; // Đặt tin nhắn bền vững
     //p1 Chỉ định bộ chuyển đổi
     //p2 routingKey 
     channel.BasicPublish(exchange: "email_topic", routingKey: "email.ChamCongBinhThuong", basicProperties: properties, body: body);
 }
}

// Nhà sản xuất thông tin xin nghỉ phép
public IEnumerable<bool> GuiXinNghiPhep(XinNghiPhepDto xinNghiPhep)
{
using (var connection = factory.CreateConnection())
 {
     var channel = connection.CreateModel();
     //2. Định nghĩa bộ chuyển đổi chủ đề
     channel.ExchangeDeclare(exchange: "email_topic", type: "topic");

     string xinNghiPhepJson = JsonConvert.SerializeObject(xinNghiPhep);
     var body = Encoding.UTF8.GetBytes(xinNghiPhepJson);

     //3. Gửi tin nhắn
     var properties = channel.CreateBasicProperties();
     properties.Persistent = true; // Đặt tin nhắn bền vững
     //p1 Chỉ định bộ chuyển đổi
     //p2 routingKey 
     channel.BasicPublish(exchange: "email_topic", routingKey: "email.XinNghiPhep", basicProperties: properties, body: body);
 }
}

public class DichVuRabbitmq : IHostedService
{
	  public Task StartAsync(CancellationToken cancellationToken)
      {
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                Port = 5672,
                Password = "guest",
                UserName = "guest",
                VirtualHost = "/"
            };
           
	              var connection = factory.CreateConnection();
                      var channel = connection.CreateModel();

			// 1. Định nghĩa bộ chuyển đổi
			channel.ExchangeDeclare(exchange: "email_topic", type: ExchangeType.Topic);

			// 2. Định nghĩa hàng đợi ngẫu nhiên
			var tenHangDoi = channel.QueueDeclare().QueueName;

			// 3. Liên kết hàng đợi với bộ chuyển đổi
			// * Khuyết điểm: chỉ có thể khớp một cấp
            // # Có thể khớp một cấp trở lên 
			channel.QueueBind(tenHangDoi, "email_topic", routingKey: "email.#");

			var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                Console.WriteLine($"model:{model}");
                var body = ea.Body;
                // 1. Mã nghiệp vụ
                var message = Encoding.UTF8.GetString(body.ToArray());
                Console.WriteLine(" [x] Gửi email {0}", message);
                // 1. Tin nhắn có được thêm vào cơ sở dữ liệu một cách bình thường không, vì vậy cần sử dụng xác nhận thủ công
                channel.BasicAck(ea.DeliveryTag, true);
            };
            // 3. Tiêu thụ tin nhắn
            channel.BasicQos(0, 1, false); // Qos (ngăn chặn nhiều người tiêu thụ, năng lực không đồng nhất, gây ra vấn đề chất lượng hệ thống.
            // autoAck đặt thành false không thực hiện xác nhận tự động                     
            channel.BasicConsume(queue: tenHangDoi, autoAck: false, consumer: consumer);
      }
      
      public Task StopAsync(CancellationToken cancellationToken)
      {
         // 1. Đóng kết nối rabbitmq
         throw new NotImplementedException();
      }
}

4. Header Exchange(Bộ chuyển đổi đầu)

Exchange loại headers không phụ thuộc vào quy tắc khớp giữa routing key và binding key để định tuyến tin nhắn, mà dựa trên thuộc tính headers trong nội dung tin nhắn được gửi để khớp. Khi liên kết Queue với Exchange, chỉ định một tập hợp các cặp khóa-giá trị và tham số x-match, tham số x-match là kiểu chuỗi, có thể được đặt thành any hoặc all. Nếu đặt thành any, có nghĩa là chỉ cần khớp được bất kỳ cặp khóa-giá trị nào trong bảng headers là được, all có nghĩa là cần khớp tất cả.

  1. Không cần phụ thuộc vào Key

  2. Thường xuyên hơn, loại Key Value này có thể được lưu trữ trong cơ sở dữ liệu, sau đó chúng ta có thể định nghĩa một quy tắc động để lắp ráp Key value này, từ đó đạt được việc định tuyến tin nhắn linh hoạt đến các hàng đợi khác nhau.

IV. Xác nhận tin nhắn RabbitMQ

Chúng ta đã đơn giản hóa triển khai một quy trình nghiệp vụ từ nhà sản xuất đến người tiêu thụ dựa trên các nghiệp vụ và mã ở trên, chúng ta có thể tóm tắt rằng, toàn bộ quá trình gửi và nhận tin nhắn bao gồm ba vai trò, nhà sản xuất (trang web quản lý nhân viên), RabbitMQ (Broker), người tiêu thụ (vi dịch vụ), trong trạng thái lý tưởng, theo cách triển khai này, toàn bộ quy trình và độ ổn định của hệ thống có thể không xảy ra vấn đề lớn, nhưng trong thực tế, chúng ta cần suy nghĩ về các vấn đề có thể xảy ra, chủ yếu từ ba khía cạnh lớn, sau đó phát triển.

  1. Phía nhà sản xuất

  2. Phía lưu trữ

  3. Phía người tiêu thụ

1. Phía nhà sản xuất

Khi gửi tin nhắn cho RabbitMQ, làm thế nào để đảm bảo tin nhắn chắc chắn đến? Chúng ta có thể sử dụng hai cơ chế xác nhận tin nhắn phía nhà sản xuất mà RabbitMQ cung cấp.

Chế độ Mô tả Cách thực hiện
Chế độ Confirm Chế độ xác nhận, sau khi nhà sản xuất gửi một tin nhắn, máy chủ Rabbitmq thực hiện một phản hồi, cho biết đã nhận được xác nhận tin nhắn Chế độ không đồng bộ, trước khi xác nhận, có thể tiếp tục gửi tin nhắn, tin nhắn đơn, tin nhắn hàng loạt
Chế độ Tx Dựa trên giao thức AMQP; có thể đặt channel thành một kênh có giao dịch, chia thành ba bước: 1. Bắt đầu giao dịch, nộp giao dịch, hoàn nguyên giao dịch Chế độ đồng bộ, trước khi nộp giao dịch không thể tiếp tục gửi tin nhắn, hiệu suất của chế độ giao dịch kém hơn một chút
  • 1. Thực hiện Confirm
using (var connection = factory.CreateConnection())
{
    var channel = connection.CreateModel();
    //2. Định nghĩa bộ chuyển đổi chủ đề
    channel.ExchangeDeclare(exchange: "email_topic", type: "topic");

    string chamCongBinhThuongDtoJson = JsonConvert.SerializeObject(chamCongBinhThuong);
    var body = Encoding.UTF8.GetBytes(chamCongBinhThuongDtoJson);

    //3. Gửi tin nhắn
    var properties = channel.CreateBasicProperties();
    properties.Persistent = true; // Đặt tin nhắn bền vững

    try
    {
        // Bật chế độ xác nhận tin nhắn
        channel.ConfirmSelect();
        channel.BasicPublish(exchange: "email_topic", 
        routingKey: "email.ChamCongBinhThuong", basicProperties: properties, body: body);
        // Nếu một hoặc nhiều tin nhắn đều được xác nhận
	    if (channel.WaitForConfirms()) 
        {
           Console.WriteLine($"【{message}】Gửi đến Broker thành công!");
        }
        else
        { 
           // Có thể ghi log, thử lại;
        }
        // Nếu tất cả tin nhắn được gửi thành công thì thực hiện bình thường; nếu có tin nhắn gửi thất bại; ném ra ngoại lệ;
        channel.WaitForConfirmsOrDie();
    }
    catch (Exception ex)
    {	
    	 Console.WriteLine($"【{message}】Gửi đến Broker thất bại!");
    }
}

  • 2. Thực hiện Tx
  using (var connection = factory.CreateConnection())
  {
      var channel = connection.CreateModel();
      //2. Định nghĩa bộ chuyển đổi chủ đề
      channel.ExchangeDeclare(exchange: "email_topic", type: "topic");
  
      string chamCongBinhThuongDtoJson = JsonConvert.SerializeObject(chamCongBinhThuong);
      var body = Encoding.UTF8.GetBytes(chamCongBinhThuongDtoJson);
  
      //3. Gửi tin nhắn
      var properties = channel.CreateBasicProperties();
      properties.Persistent = true; // Đặt tin nhắn bền vững
  
      try
      {
          // Bật cơ chế giao dịch, giao thức AMQP hỗ trợ
          channel.TxSelect(); // Giao dịch được hỗ trợ bởi giao thức
          channel.BasicPublish(exchange: "email_topic", 
          routingKey: "email.ChamCongBinhThuong", basicProperties: properties, body: body);
          // Nộp giao dịch chỉ khi giao dịch được nộp thì mới thực sự ghi vào hàng đợi
          channel.TxCommit();
      }
      catch (Exception ex)
      {	
      	// Hoàn nguyên giao dịch
    		 channel.TxRollback(); 
      }
  }

2. Phía lưu trữ

Sau khi nhà sản xuất gửi tin nhắn thành công cho RabbitMQ, nếu RabbitMQ bị treo, sẽ dẫn đến mất tin nhắn trong RabbitMQ, làm thế nào để giải quyết vấn đề mất tin nhắn? Đối với việc mất tin nhắn RabbitMQ, chúng ta có thể sử dụng trong nhà sản xuất

  1. Tin nhắn bền vững

  2. Cụm

3. Phía người tiêu thụ
    1. Người tiêu thụ bị treo, dẫn đến mất tin nhắn
    1. Thực hiện mã nghiệp vụ thất bại, nhưng tin nhắn đã được tiêu thụ

Khi nhà sản xuất ghi tin nhắn vào RabbitMQ, trong quá trình dịch vụ tiêu thụ nhận tin nhắn, máy chủ bị treo, dẫn đến mất tin nhắn, lúc này chúng ta nên sử dụng cơ chế xác nhận tin nhắn phía người tiêu thụ của RabbitMQ

Chế độ Mô tả Đặc điểm
Xác nhận tự động autoAck Xác nhận tự động, khi tiêu thụ tin nhắn, chỉ cần nhận được tin nhắn, sẽ trực tiếp xác nhận cho RabbitMQ rằng đã nhận được mọi thứ bình thường; Tổng quan tất cả, nếu có 10.000 tin nhắn, chỉ tiêu thụ thành công một tin nhắn, RabbitMQ cũng sẽ cho rằng bạn đã thành công, sẽ xóa tất cả tin nhắn khỏi hàng đợi; Điều này sẽ dẫn đến mất tin nhắn Xử lý rất nhanh
Xác nhận thủ công Người tiêu thụ tiêu thụ một, xác nhận một tin nhắn cho RabbitMQ, RabbitMQ chỉ xóa tin nhắn hiện tại này, tương đương với tiêu thụ một, xóa một tin nhắn; Hiệu suất hơi thấp hơn một chút

1. Xác nhận tự động

// Cơ chế xác nhận tin nhắn tự động
channel.BasicConsume(queue: "ThemChamCong", autoAck: true, consumer: consumer);

2. Xác nhận thủ công

Người tiêu thụ nhận được tin nhắn. Trong quá trình người tiêu thụ gửi xác nhận tin nhắn cho rabbitmq, thực hiện mã nghiệp vụ thất bại, nhưng tin nhắn đã được xác nhận là đã được tiêu thụ, chúng ta nên thực hiện cơ chế xác nhận tin nhắn thủ công sau khi người tiêu thụ nhận tin nhắn và thực hiện mã nghiệp vụ, đảm bảo tin nhắn không bị mất

var connection = factory.CreateConnection();
var channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: "email_topic", type: ExchangeType.Topic);
var tenHangDoi = channel.QueueDeclare().QueueName;
channel.QueueBind(tenHangDoi, "email_topic", routingKey: "email.#");

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
      var message = Encoding.UTF8.GetString(ea.Body.ToArray());
      // Thực hiện mã nghiệp vụ
      
      // Xác nhận thủ công cho broker biết có thể xóa tin nhắn
      channel.BasicAck(ea.DeliveryTag, true);

      // Từ chối: cho broker biết, tôi không tiêu thụ tin nhắn này một cách bình thường; requeue: true: Ghi lại vào hàng đợi; false: Bạn vẫn xóa đi;
      //channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);
};

// autoAck đặt thành false không thực hiện xác nhận tự động                     
channel.BasicConsume(queue: tenHangDoi, autoAck: false, consumer: consumer);

    1. Do hiệu suất máy chủ không đồng nhất dẫn đến tích tụ tin nhắn

Nhà sản xuất gửi tin nhắn đồng thời cao, người tiêu thụ không kịp xử lý, dẫn đến tích tụ tin nhắn, làm thế nào để giải quyết vấn đề tích tụ tin nhắn? Có thể sử dụng cụm dịch vụ tiêu thụ, phân tán áp lực đến các thể hiện dịch vụ khác nhau có thể giải quyết vấn đề này, nhưng lại sinh ra một vấn đề mới về khuyết điểm cụm, giả sử hiệu suất của các máy chủ trong cụm không đồng nhất, máy chủ yếu hơn sẽ xử lý tin nhắn chậm, sẽ dẫn đến hầu hết tin nhắn tích tụ trên máy chủ hiệu suất kém, vậy làm thế nào để giải quyết? Chúng ta có thể sử dụng chức năng QOS của RabbitMQ, thường được gọi là giới hạn tốc độ, ý nghĩa của nó là người tiêu thụ có thể lấy một số lượng tin nhắn được chỉ định mỗi lần, trước khi các tin nhắn này được xử lý xong, sẽ không lấy tin nhắn từ hàng đợi nữa.

// Qos (ngăn chặn nhiều người tiêu thụ, năng lực không đồng nhất, gây ra vấn đề chất lượng hệ thống.
// Mỗi lần một người tiêu thụ chỉ tiêu thụ thành công một)
channel.BasicQos(0, 1, false); 

    1. Làm thế nào để đảm bảo tin nhắn không bị tiêu thụ lặp lại (tính lũy thừa)

1. Tin nhắn lặp lại khi sản xuất

Do nhà sản xuất gửi tin nhắn cho MQ, trong quá trình MQ xác nhận, có sự biến động mạng, nhà sản xuất không nhận được xác nhận, thực tế MQ đã nhận được tin nhắn. Lúc này nhà sản xuất sẽ gửi lại tin nhắn này một lần. Nếu tin nhắn của nhà sản xuất không được xác nhận hoặc xác nhận thất bại, chúng ta có thể sử dụng tác vụ định thời + (redis/db) để thử lại tin nhắn.

2. Tin nhắn lặp lại khi tiêu thụ

Người tiêu thụ tiêu thụ thành công, sau đó gửi xác nhận cho MQ, MQ không nhận được xác nhận, để đảm bảo tin nhắn được tiêu thụ, MQ sẽ tiếp tục gửi tin nhắn trước đó cho người tiêu thụ. Lúc này người tiêu thụ sẽ nhận được hai tin nhắn giống hệt nhau. Chúng ta có thể để mỗi tin nhắn mang một ID duy nhất toàn cục, có thể đảm bảo tính lũy thừa của tin nhắn. Người tiêu thụ nhận được tin nhắn, trước tiên truy vấn theo id trong redis/db xem có tồn tại tin nhắn này không. Nếu không tồn tại, thì tiêu thụ bình thường, sau khi tiêu thụ xong, ghi vào redis/db. Nếu tồn tại, thì chứng minh rằng tin nhắn đã được tiêu thụ, trực tiếp bỏ qua.

Thẻ: rabbitmq AMQP Message Queue middleware Fanout

Đăng vào ngày 9 tháng 6 lúc 03:30