Để triển khai cơ chế xử lý bất đồng bộ cho hệ thống đặt hàng, RabbitMQ là một lựa chọn phổ biến. Trong bài viết này, chúng ta sẽ xây dựng hai loại hàng đợi: hàng đợi tiêu thụ thông thường và hàng đợi tiêu thụ trễ (delay queue) bằng cách tận dụng tính năng Dead Letter Exchange (DLX) của RabbitMQ.
Cài đặt thư viện
Sử dụng php-amqplib/php-amqplib để tương tác với RabbitMQ:
composer require php-amqplib/php-amqplib
Cấu hình cơ sở
Lớp RabbitMqBase định nghĩa các thành phần cốt lõi như exchange, queue và thiết lập DLX:
<?php
namespace App\Service;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;
class RabbitMqBase
{
public const DLX_QUEUE = 'dlx.queue';
public const DLX_EXCHANGE = 'dlx.exchange';
public const DLX_ROUTING_KEY = 'dlx.routing.key';
public const MAIN_QUEUE = 'main.queue';
public const MAIN_EXCHANGE = 'main.exchange';
private static function getConfig()
{
$env = config('system.is_online') ? 'online' : 'offline';
return config("system.{$env}");
}
public static function createConnection(): AMQPStreamConnection
{
$cfg = self::getConfig();
$conn = new AMQPStreamConnection(
$cfg['host'],
$cfg['port'],
$cfg['name'],
$cfg['password']
);
self::setupQueues($conn);
return $conn;
}
private static function setupQueues(AMQPStreamConnection $connection): void
{
$ch = $connection->channel();
// Khai báo exchange
$ch->exchange_declare(self::DLX_EXCHANGE, AMQPExchangeType::DIRECT, false, true, false);
$ch->exchange_declare(self::MAIN_EXCHANGE, AMQPExchangeType::FANOUT, false, true, false);
// Cấu hình hàng đợi chính với TTL và DLX
$args = new AMQPTable();
$args->set('x-message-ttl', 5000); // 5 giây
$args->set('x-dead-letter-exchange', self::DLX_EXCHANGE);
$args->set('x-dead-letter-routing-key', self::DLX_ROUTING_KEY);
$ch->queue_declare(self::MAIN_QUEUE, false, true, false, false, false, $args);
$ch->queue_declare(self::DLX_QUEUE, false, true, false, false);
// Gắn kết queue với exchange
$ch->queue_bind(self::MAIN_QUEUE, self::MAIN_EXCHANGE);
$ch->queue_bind(self::DLX_QUEUE, self::DLX_EXCHANGE, self::DLX_ROUTING_KEY);
$ch->close();
}
}
Nhà sản xuất (Producer)
Gửi tin nhắn vào hàng đợi chính thông qua exchange:
<?php
namespace App\Service;
use PhpAmqpLib\Message\AMQPMessage;
class OrderProducer extends RabbitMqBase
{
public static function publishOrders(): void
{
$conn = self::createConnection();
$ch = $conn->channel();
for ($i = 0; $i < 5; $i++) {
$payload = json_encode([
'user_id' => random_int(1, 100),
'amount' => random_int(10000, 99999),
'order_no' => random_int(100, 999)
]);
$msg = new AMQPMessage($payload);
$ch->basic_publish($msg, self::MAIN_EXCHANGE);
echo "[x] Sent at " . date('Y-m-d H:i:s') . ": {$payload}\n";
}
$ch->close();
$conn->close();
}
}
Người tiêu thụ chính (Main Consumer)
Xử lý tin nhắn từ hàng đợi chính. Nếu xử lý thất bại hoặc chủ động từ chối, tin nhắn sẽ được chuyển sang DLX:
<?php
namespace App\Service;
use PhpAmqpLib\Message\AMQPMessage;
class MainConsumer extends RabbitMqBase
{
public static function consume(): void
{
$conn = self::createConnection();
$ch = $conn->channel();
$callback = function (AMQPMessage $msg) {
echo "[x] Processing: " . $msg->body . " at " . date('Y-m-d H:i:s') . "\n";
// Giả lập xử lý lâu hơn TTL (5s)
sleep(10);
// Từ chối và không đưa lại vào hàng đợi → chuyển sang DLX
$msg->ack(false); // basic.nack với requeue=false
};
$ch->basic_consume(self::MAIN_QUEUE, '', false, false, false, false, $callback);
while ($ch->is_consuming()) {
$ch->wait();
}
$ch->close();
$conn->close();
}
}
Người tiêu thụ DLX (Delay Consumer)
Nghe và xử lý các tin nhắn đã hết hạn hoặc bị từ chối:
<?php
namespace App\Service;
use PhpAmqpLib\Message\AMQPMessage;
class DelayedConsumer extends RabbitMqBase
{
public static function consumeDelayed(): void
{
$conn = self::createConnection();
$ch = $conn->channel();
$callback = function (AMQPMessage $msg) {
echo "[✓] Delayed message processed: " . $msg->body . " at " . date('Y-m-d H:i:s') . "\n";
$msg->ack(); // Xác nhận hoàn tất
};
$ch->basic_consume(self::DLX_QUEUE, '', false, true, false, false, $callback);
while ($ch->is_consuming()) {
$ch->wait();
}
$ch->close();
$conn->close();
}
}
Lưu ý quan trọng
- Loại exchange:
FANOUTbroadcast tin nhắn đến mọi queue được bind, trong khiDIRECTyêu cầu routing key khớp chính xác. - Dead Letter Conditions: Tin nhắn trở thành dead letter khi:
- Bị từ chối với
requeue=false, - Hết thời gian sống (TTL),
- Hàng đợi đạt giới hạn độ dài.
- Bị từ chối với
- Delay Queue: Không khởi chạy
MainConsumer→ tin nhắn tự động chuyển sang DLX sau TTL → hiệu ứng delay. - Xác nhận thủ công: Sử dụng
basic_ackđể xác nhận xử lý thành công,basic_nack(..., false)để từ chối vĩnh viễn. - Consumer phải chạy trong môi trường CLI, trong khi Producer có thể gọi từ HTTP nếu cần.
Luồng dữ liệu:
Producer → main.exchange → main.queue (TTL=5s) → [hết hạn] → dlx.exchange → dlx.queue → DelayedConsumer