Gói đóng gói .NET RabbitMQ với Trực tiếp Trao đổi và Hàng đợi Thư chết (Phần 11)

Thư mục Connector - Mã nguồn

 public interface IRabbitConnector : IDisposable
    {
        /// <summary>
        /// Khởi tạo trao đổi và hàng đợi
        /// </summary>
        /// <param name="config"></param>
        /// <returns></returns>
        Task SetupExchangeAndQueueAsync(MessageConfiguration config);      
        /// <summary>
        /// Lấy khóa bộ nhớ cache - lưu trữ kênh IChannel đã khởi tạo vào cache để tối ưu hiệu suất
        /// </summary>
        /// <param name="config">Đối tượng khai báo trao đổi và hàng đợi</param>
        /// <returns></returns>
        string GenerateCacheKey(MessageConfiguration config);     

        /// <summary>
        /// Kênh truyền thông
        /// </summary>
        IChannel CommunicationChannel { get; }
    }
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace RabbitMQWrapper.Connection
{
    /// <summary>
    /// Lớp này cần được triển khai theo mô hình Singleton
    /// </summary>
    public class RabbitConnector : IRabbitConnector
    {
        private IConnection connection;
        private IChannel channel;
        private ConnectionFactory factory;
        private readonly RabbitSettings rabbitSettings;
        
        public RabbitConnector(IOptions<RabbitSettings> rabbitSettings)
        {
            this.rabbitSettings = rabbitSettings.Value;
            Initialize();
        }
        
        /// <summary>
        /// Khởi tạo kết nối
        /// </summary>
        private void Initialize()
        {
            factory = new ConnectionFactory()
            {
                UserName = rabbitSettings.UserName,
                Password = rabbitSettings.Password,
                VirtualHost = rabbitSettings.VirtualHost,
                HostName = rabbitSettings.HostName,
                Port = rabbitSettings.Port,
                DispatchConsumersAsync = true
            };           
            this.connection = factory.CreateConnection();
            this.channel = connection.CreateChannel();
        }
        
        /// <summary>
        /// Kênh truyền thông
        /// </summary>
        public IChannel CommunicationChannel => this.channel;

        /// <summary>
        /// Khởi tạo trao đổi và hàng đợi
        /// </summary>
        /// <param name="config"></param>
        /// <returns></returns>
        public async Task SetupExchangeAndQueueAsync(MessageConfiguration config)
        {
            if (!this.channel.IsOpen)
            {
                throw new Exception("Kênh truyền thông chưa được mở!!!");
            }

            //Nếu có định nghĩa hàng đợi chết
            if (config.Arguments != null && config.Arguments.ContainsKey("x-dead-letter-exchange"))
            {
                //Trao đổi hàng đợi chết
                string deadLetterExchange = config.Arguments["x-dead-letter-exchange"].ToString();
                //Hàng đợi chết
                string deadLetterQueue = config.Arguments["x-dead-letter-queue"].ToString();
                //Khóa định tuyến hàng đợi chết
                string deadLetterRoutingKey = config.Arguments["x-dead-letter-routing-key"].ToString();
                
                await channel.ExchangeDeclareAsync(exchange: deadLetterExchange, type: ExchangeType.Direct, passive: false, durable: true, autoDelete: false, arguments: null);
                await channel.QueueDeclareAsync(queue: deadLetterQueue, passive: false, durable: true, exclusive: false, autoDelete: false, arguments: null);
                await channel.QueueBindAsync(queue: deadLetterQueue, exchange: deadLetterExchange, routingKey: deadLetterRoutingKey, arguments: null);
            }
            
            await channel.ExchangeDeclareAsync(exchange: config.ExchangeName, type: ExchangeType.Direct, passive: false, durable: true, autoDelete: false, arguments: config.Arguments);
            await channel.QueueDeclareAsync(queue: config.QueueName, passive: false, durable: true, exclusive: false, autoDelete: false, arguments: config.Arguments);
            await channel.QueueBindAsync(queue: config.QueueName, exchange: config.ExchangeName, routingKey: config.RoutingKey, arguments: config.Arguments);
        }

        /// <summary>
        /// Tạo khóa bộ nhớ cache
        /// </summary>
        /// <param name="config"></param>
        /// <returns></returns>
        public string GenerateCacheKey(MessageConfiguration config)
        {
            string key = $"{config.ExchangeName}_{config.QueueName}_{config.RoutingKey}";
            
            //Nếu là hàng đợi chết
            if (config.Arguments != null && config.Arguments.ContainsKey("x-dead-letter-exchange"))
            {
                //Trao đổi hàng đợi chết
                string deadLetterExchange = config.Arguments["x-dead-letter-exchange"].ToString();
                //Hàng đợi chết
                string deadLetterQueue = config.Arguments["x-dead-letter-queue"].ToString();
                //Khóa định tuyến hàng đợi chết
                string deadLetterRoutingKey = config.Arguments["x-dead-letter-routing-key"].ToString();
                return key += $"{deadLetterExchange}_{deadLetterQueue}_{deadLetterRoutingKey}";
            }
            
            return key;
        }
        
        public async void Dispose()
        {
            if (connection != null)
            {
                await connection.CloseAsync();
            }
            if (channel != null)
            {
                await channel.CloseAsync();
            }
        }
    }
}

Thư mục Consumer - Mã nguồn

using RabbitMQWrapper.Models;
using System;
using System.Threading.Tasks;

namespace RabbitMQWrapper
{
    /// <summary>
    /// Giao diện người tiêu dùng
    /// </summary>
    public interface IMessageConsumer
    {
        /// <summary>
        /// Nhận tin nhắn
        /// </summary>
        /// <param name="config">Cấu hình trao đổi và hàng đợi</param>
        /// <param name="messageHandler">Hàm xử lý tin nhắn, trả về trạng thái xử lý</param>
        void Receive(MessageConfiguration config, Func<string, bool> messageHandler);
    }
}
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading.Tasks;

namespace RabbitMQWrapper
{
    /// <summary>
    /// Người tiêu dùng tin nhắn
    /// </summary>
    public class MessageConsumer : IMessageConsumer
    {
        private readonly IRabbitConnector connector;
        private readonly ILogger<MessageConsumer> logger;
        private readonly IMemoryCache cache;

        public MessageConsumer(IRabbitConnector connector, ILogger<MessageConsumer> logger, IMemoryCache cache)
        {
            this.connector = connector;
            this.logger = logger;
            this.cache = cache;
        }

        /// <summary>
        /// Nhận tin nhắn
        /// </summary>
        /// <param name="config">Cấu hình trao đổi và hàng đợi</param>
        /// <param name="messageHandler">Hàm xử lý tin nhắn</param>
        public async void Receive(MessageConfiguration config, Func<string, bool> messageHandler)
        {
            //Lấy khóa bộ nhớ cache
            string cacheKey = this.connector.GenerateCacheKey(config);
            
            //Kiểm tra xem kênh có tồn tại trong bộ nhớ cache không
            if (!this.cache.TryGetValue<IChannel>(cacheKey, out IChannel channel))
            {
                //Khởi tạo trao đổi
                await this.connector.SetupExchangeAndQueueAsync(config);
                channel = this.connector.CommunicationChannel;
                this.cache.Set<IChannel>(cacheKey, channel);
            }
            
            //Người tiêu dùng sự kiện bất đồng bộ
            var consumer = new AsyncEventingBasicConsumer(channel);
            consumer.Received += async (ch, ea) =>
            {
                string messageContent = Encoding.UTF8.GetString(ea.Body.ToArray(), 0, ea.Body.Length);
                
                if (messageHandler(messageContent))
                {
                    //Xác nhận nhận tin nhắn
                    await channel.BasicAckAsync(ea.DeliveryTag, false);
                }
                else
                {
                    //Từ chối tin nhắn
                    await channel.BasicRejectAsync(ea.DeliveryTag, false);
                }
            };
            
            string consumerTag = this.connector.CommunicationChannel.BasicConsume(config.QueueName, false, consumer);
        }
    }
}

Thư mục Models - Mã nguồn

using System.Collections.Generic;

namespace RabbitMQWrapper.Models
{
    /// <summary>
    /// Cấu hình trao đổi và hàng đợi
    /// </summary>
    public class MessageConfiguration
    {
        /// <summary>
        /// Tên trao đổi
        /// </summary>
        public string ExchangeName { get; set; }
        
        /// <summary>
        /// Tên hàng đợi
        /// </summary>
        public string QueueName { get; set; }

        /// <summary>
        /// Khóa định tuyến
        /// </summary>
        public string RoutingKey { get; set; }

        /// <summary>
        /// Tham số bổ sung
        /// </summary>
        public Dictionary<string, object> Arguments { get; set; } = null;
    }   
}
using System;

namespace RabbitMQWrapper.Models
{
    /// <summary>
    /// Khai báo tên trao đổi và hàng đợi để tái sử dụng
    /// </summary>
    public static class MessageConfigurationConstants
    {
        public static readonly object ProducerLock = new object();

        #region Khai báo trao đổi loại direct
        public const string TestDirectExchange = "TestDirectExchange";
        public const string TestDirectQueue = "TestDirectQueue";
        public const string TestDirectKey = "TestDirectKey";
        #endregion

        #region Khai báo trao đổi và hàng đợi chết
        /// <summary>
        /// Trao đổi hàng đợi chết
        /// </summary>
        public const string DeadLetterExchange = "DeadLetterExchange";
        /// <summary>
        /// Hàng đợi chết
        /// </summary>
        public const string DeadLetterQueue = "DeadLetterQueue";
        /// <summary>
        /// Khóa định tuyến hàng đợi chết
        /// </summary>
        public const string DeadLetterRoutingKey = "DeadLetterRoutingKey";
        /// <summary>
        /// Trao đổi bình thường trong hàng đợi chết
        /// </summary>
        public const string NormalExchange = "NormalExchange";
        /// <summary>
        /// Hàng đợi bình thường trong hàng đợi chết
        /// </summary>
        public const string NormalQueue = "NormalQueue";
        /// <summary>
        /// Khóa định tuyến bình thường trong hàng đợi chết
        /// </summary>
        public const string NormalRoutingKey = "NormalRoutingKey";
        #endregion
    }
}
using System;

namespace RabbitMQWrapper.Models
{
    /// <summary>
    /// Lớp truyền tin nhắn
    /// </summary>
    public class MessageTransmission
    {
        /// <summary>
        /// Cấu hình trao đổi và hàng đợi
        /// </summary>
        public MessageConfiguration Configuration { get; set; }
        
        /// <summary>
        /// Nội dung tin nhắn
        /// </summary>
        public object Content { get; set; }
    }
}

Thư mục Producer - Mã nguồn

using RabbitMQ.Client;
using System;
using System.Threading.Tasks;

namespace RabbitMQWrapper
{
    /// <summary>
    /// Giao diện nhà sản xuất
    /// </summary>
    public interface IMessageProducer
    {
        /// <summary>
        /// Gửi tin nhắn
        /// </summary>
        /// <param name="message">Đối tượng truyền tin nhắn</param>
        /// <returns></returns>
        Task SendMessageAsync(MessageTransmission message);
    }
}
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using RabbitMQ.Client;
using System;
using System.Text;
using System.Threading.Tasks;

namespace RabbitMQWrapper
{
    /// <summary>
    /// Trực tiếp trao đổi loại exchange
    /// </summary>
    public class MessageProducer : IMessageProducer
    {
        private readonly IRabbitConnector connector;
        private readonly ILogger<MessageProducer> logger;
        private readonly IMemoryCache cache;
        
        public MessageProducer(IRabbitConnector connector, ILogger<MessageProducer> logger, IMemoryCache cache)
        {
            this.connector = connector;
            this.logger = logger;
            this.cache = cache;
        }
        
        public async Task SendMessageAsync(MessageTransmission message)
        {
            try
            {
                //Lấy khóa bộ nhớ cache
                string cacheKey = connector.GenerateCacheKey(message.Configuration);
                
                //Kiểm tra xem kênh có tồn tại trong bộ nhớ cache không
                if (!cache.TryGetValue(cacheKey, out IChannel channel))
                {
                    //Khởi tạo trao đổi
                    await connector.SetupExchangeAndQueueAsync(message.Configuration);
                    channel = connector.CommunicationChannel;
                    cache.Set(cacheKey, channel);
                }
                
                //Gửi tin nhắn
                string messageBody = JsonConvert.SerializeObject(message.Content);
                byte[] messageBytes = Encoding.UTF8.GetBytes(messageBody);
                
                //Bắt đầu giao dịch
                await connector.CommunicationChannel.TxSelectAsync();
                await channel.BasicPublishAsync(message.Configuration.ExchangeName, message.Configuration.RoutingKey, messageBytes);
                
                //Hoàn thành giao dịch
                await connector.CommunicationChannel.TxCommitAsync();
                logger.LogInformation($"Gửi tin nhắn: {messageBody}");
            }
            catch (Exception ex)
            {
                //Hoàn tác giao dịch
                await connector.CommunicationChannel.TxRollbackAsync();
                logger.LogError($"Gửi tin nhắn thất bại: {ex.Message}", ex);
            }
        }
    }
}

Thư mục RabbitMQSettings - Mã nguồn

using System;

namespace RabbitMQWrapper.Settings
{
    /// <summary>
    /// Cấu hình RabbitMQ
    /// </summary>
    public class RabbitSettings
    {
        /// <summary>
        /// Tên người dùng
        /// </summary>
        public string UserName { get; set; }
        
        /// <summary>
        /// Mật khẩu
        /// </summary>
        public string Password { get; set; }
        
        /// <summary>
        /// Máy chủ ảo, ví dụ: center
        /// </summary>
        public string VirtualHost { get; set; }
        
        /// <summary>
        /// Máy chủ RabbitMQ, ví dụ: 192.168.0.168
        /// </summary>
        public string HostName { get; set; }
        
        /// <summary>
        /// Cổng, mặc định 5672
        /// </summary>
        public int Port { get; set; } = 5672;
    }
}
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using RabbitMQWrapper.Connection;
using RabbitMQWrapper;
using System;

namespace RabbitMQWrapper.Extensions
{
    public static class RabbitMQExtensions
    {
        /// <summary>
        /// Thêm nhà sản xuất và người tiêu dùng
        /// </summary>
        /// <param name="services"></param>
        /// <param name="config"></param>
        /// <returns></returns>
        public static IServiceCollection AddRabbitMQ(this IServiceCollection services, IConfiguration config)
        {
            services.Configure<RabbitSettings>(config.GetSection("RabbitSettings"));
            services.AddMemoryCache();
            services.AddSingleton<IRabbitConnector, RabbitConnector>();
            services.AddScoped<IMessageProducer, MessageProducer>();
            services.AddScoped<IMessageConsumer, MessageConsumer>();
            return services;
        }
        
        /// <summary>
        /// Thêm RabbitMQ chỉ với nhà sản xuất
        /// </summary>
        /// <param name="services"></param>
        /// <param name="config"></param>
        /// <returns></returns>
        public static IServiceCollection AddRabbitMQProducer(this IServiceCollection services, IConfiguration config)
        {
            services.Configure<RabbitSettings>(config.GetSection("RabbitSettings"));
            services.AddMemoryCache();
            services.AddSingleton<IRabbitConnector, RabbitConnector>();
            services.AddScoped<IMessageProducer, MessageProducer>();
            return services;
        }
        
        /// <summary>
        /// Thêm RabbitMQ chỉ với người tiêu dùng
        /// </summary>
        /// <param name="services"></param>
        /// <param name="config"></param>
        /// <returns></returns>
        public static IServiceCollection AddRabbitMQConsumer(this IServiceCollection services, IConfiguration config)
        {
            services.Configure<RabbitSettings>(config.GetSection("RabbitSettings"));
            services.AddMemoryCache();
            services.AddSingleton<IRabbitConnector, RabbitConnector>();
            services.AddScoped<IMessageConsumer, MessageConsumer>();
            return services;
        }
    }
}

Đây là mã nguồn của thư viện đóng gói RabbitMQ

Dưới đây là API kiểm thử

appsettings.json

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.AspNetCore": "Warning"
    }
  },
  "AllowedHosts": "*",
  "RabbitSettings": {
    "HostName": "192.168.0.168",
    "Port": "5672",
    "UserName": "test",
    "Password": "test",
    "VirtualHost": "center"
  }
}

Program.cs

using RabbitMQWrapper.Extensions;
using RabbitMQTestAPI;
using System.Runtime.CompilerServices;

var builder = WebApplication.CreateBuilder(args);

// Thêm dịch vụ vào container

builder.Services.AddControllers();
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
builder.Services.AddRabbitMQ(builder.Configuration);
builder.Services.AddHostedService<DirectExchangeTestService>();
builder.Services.AddHostedService<DeadLetterQueueTestService>();
var app = builder.Build();

// Cấu hình pipeline HTTP

if (app.Environment.IsDevelopment())
{
    app.UseSwagger();
    app.UseSwaggerUI();
}

app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();
app.Run();

DeadLetterQueueTestService

using Microsoft.Extensions.Hosting;
using RabbitMQWrapper;
using RabbitMQWrapper.Models;
using RabbitMQTestAPI.Controllers;

namespace RabbitMQTestAPI
{
    /// <summary>
    /// Kiểm thử hàng đợi chết
    /// </summary>
    public class DeadLetterQueueTestService : BackgroundService
    {
        private readonly ILogger<RabbitConsumerController> logger;
        private readonly IServiceScopeFactory scopeFactory;
        
        public DeadLetterQueueTestService(ILogger<RabbitConsumerController> logger, IServiceScopeFactory scopeFactory)
        {
            this.logger = logger;
            this.scopeFactory = scopeFactory;
        }
        
        protected override Task ExecuteAsync(CancellationToken stoppingToken)
        {
            using IServiceScope scope = this.scopeFactory.CreateScope();
            IMessageConsumer consumer = scope.ServiceProvider.GetRequiredService<IMessageConsumer>();
            
            Task.Factory.StartNew(() =>
             {
                 consumer.Receive(
                     new MessageConfiguration
                     {
                         ExchangeName = MessageConfigurationConstants.DeadLetterExchange,
                         QueueName = MessageConfigurationConstants.DeadLetterQueue,
                         RoutingKey = MessageConfigurationConstants.DeadLetterRoutingKey,                       
                     }, message =>
                     {
                         this.logger.LogInformation($"Nhận tin nhắn hàng đợi chết: {message}");
                         return true;
                     });
             }, TaskCreationOptions.LongRunning);
            
            return Task.CompletedTask;
        }
    }
}

DirectExchangeTestService

using RabbitMQWrapper;
using RabbitMQWrapper.Models;
using RabbitMQTestAPI.Controllers;

namespace RabbitMQTestAPI
{
    /// <summary>
    /// Kiểm thử trao đổi trực tiếp
    /// </summary>
    public class DirectExchangeTestService : IHostedService
    {
        private readonly ILogger<RabbitConsumerController> logger;
        private readonly IServiceScopeFactory scopeFactory;
        
        public DirectExchangeTestService(ILogger<RabbitConsumerController> logger, IServiceScopeFactory scopeFactory)
        {
            this.logger = logger;
            this.scopeFactory = scopeFactory;
        }
        
        public Task StartAsync(CancellationToken cancellationToken)
        {
            using AsyncServiceScope asyncScope = scopeFactory.CreateAsyncScope();
            IMessageConsumer consumer = asyncScope.ServiceProvider.GetRequiredService<IMessageConsumer>();
            
            Task.Factory.StartNew(() =>
            {
                consumer.Receive(
                 new MessageConfiguration
                 {
                     ExchangeName = MessageConfigurationConstants.TestDirectExchange,
                     QueueName = MessageConfigurationConstants.TestDirectQueue,
                     RoutingKey = MessageConfigurationConstants.TestDirectKey,
                 }, message =>
                 {
                     this.logger.LogInformation($"Nhận tin nhắn: {message}");
                     return true;
                 });
            }, TaskCreationOptions.LongRunning);
            
            return Task.CompletedTask;
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            throw new NotImplementedException();
        }
    }
}
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using RabbitMQWrapper;
using RabbitMQWrapper.Models;

namespace RabbitMQTestAPI.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class RabbitProducerController : ControllerBase
    {
        private readonly ILogger<RabbitProducerController> logger;
        private readonly IMessageProducer producer;
        private static readonly object lockObject = new object();
        private static SemaphoreSlim messageSemaphore = new SemaphoreSlim(1);
        
        public RabbitProducerController(ILogger<RabbitProducerController> logger, IMessageProducer producer)
        {
            this.logger = logger;
            this.producer = producer;
        }
        
        /// <summary>
        /// Gửi tin nhắn trực tiếp
        /// </summary>
        /// <param name="content"></param>
        /// <returns></returns>
        [HttpPost]
        public async Task<ActionResult> SendMessage([FromBody] string content)
        {
            await producer.SendMessageAsync(new MessageTransmission
            {
                Configuration = new MessageConfiguration
                {
                    ExchangeName = MessageConfigurationConstants.TestDirectExchange,
                    QueueName = MessageConfigurationConstants.TestDirectQueue,
                    RoutingKey = MessageConfigurationConstants.TestDirectKey
                },
                Content = content
            });
            
            return Ok("Gửi thành công!");
        }

        /// <summary>
        /// Gửi tin nhắn có độ trễ
        /// </summary>
        /// <param name="student"></param>
        /// <returns></returns>
        [HttpPost("SendDelayedMessage")]
        public async Task<ActionResult> SendDelayedMessage([FromBody] Student student)
        {
            if (await messageSemaphore.WaitAsync(1000))
            {
                await producer.SendMessageAsync(new MessageTransmission
                {
                    Configuration = new MessageConfiguration
                    {
                        ExchangeName = MessageConfigurationConstants.NormalExchange,
                        QueueName = MessageConfigurationConstants.NormalQueue,
                        RoutingKey = MessageConfigurationConstants.NormalRoutingKey,
                        Arguments = new Dictionary<string, object>
                        {
                            ["x-message-ttl"] = 12000,
                            ["x-dead-letter-exchange"] = MessageConfigurationConstants.DeadLetterExchange,
                            ["x-dead-letter-queue"] = MessageConfigurationConstants.DeadLetterQueue,
                            ["x-dead-letter-routing-key"] = MessageConfigurationConstants.DeadLetterRoutingKey,
                        }
                    },
                    Content = student
                });
            }
            
            messageSemaphore.Release();
            return Ok("Gửi thành công!");
        }
    }
}
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using RabbitMQWrapper;
using RabbitMQWrapper.Models;

namespace RabbitMQTestAPI.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class RabbitConsumerController : ControllerBase
    {
        private readonly ILogger<RabbitConsumerController> logger;
        private readonly IMessageConsumer consumer;

        public RabbitConsumerController(ILogger<RabbitConsumerController> logger, IMessageConsumer consumer)
        {
            this.logger = logger;
            this.consumer = consumer;
        }

        [HttpGet]
        public ActionResult ReceiveMessage()
        {
            this.consumer.Receive(
                new MessageConfiguration
                {
                    ExchangeName = MessageConfigurationConstants.TestDirectExchange,
                    QueueName = MessageConfigurationConstants.TestDirectQueue,
                    RoutingKey = MessageConfigurationConstants.TestDirectKey,
                }, message =>
                {
                    this.logger.LogInformation($"Nhận tin nhắn: {message}");
                    return true;
                });
            
            return Ok();
        }

        /// <summary>
        /// Nhận tin nhắn từ hàng đợi chết
        /// </summary>
        /// <returns></returns>
        [HttpGet("ReceiveDeadLetter")]
        public ActionResult ReceiveDeadLetterMessage()
        {
            this.consumer.Receive(
                new MessageConfiguration
                {
                    ExchangeName = MessageConfigurationConstants.DeadLetterExchange,
                    QueueName = MessageConfigurationConstants.DeadLetterQueue,
                    RoutingKey = MessageConfigurationConstants.DeadLetterRoutingKey
                }, message =>
                {
                    this.logger.LogInformation($"Nhận tin nhắn hàng đợi chết: {message}");
                    return true;
                });
            
            return Ok();
        }
    }
}

Thẻ: .NET rabbitmq Direct Exchange Dead Letter Queue Message Queue

Đăng vào ngày 4 tháng 7 lúc 15:47