Sử dụng RabbitMQ trong PHP để xử lý hàng đợi đơn hàng và hàng đợi tiêu thụ trễ

Để triển khai cơ chế xử lý bất đồng bộ cho hệ thống đặt hàng, RabbitMQ là một lựa chọn phổ biến. Trong bài viết này, chúng ta sẽ xây dựng hai loại hàng đợi: hàng đợi tiêu thụ thông thường và hàng đợi tiêu thụ trễ (delay queue) bằng cách tận dụng tính năng Dead Letter Exchange (DLX) của RabbitMQ.

Cài đặt thư viện

Sử dụng php-amqplib/php-amqplib để tương tác với RabbitMQ:

composer require php-amqplib/php-amqplib

Cấu hình cơ sở

Lớp RabbitMqBase định nghĩa các thành phần cốt lõi như exchange, queue và thiết lập DLX:

<?php

namespace App\Service;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;

class RabbitMqBase
{
    public const DLX_QUEUE = 'dlx.queue';
    public const DLX_EXCHANGE = 'dlx.exchange';
    public const DLX_ROUTING_KEY = 'dlx.routing.key';

    public const MAIN_QUEUE = 'main.queue';
    public const MAIN_EXCHANGE = 'main.exchange';

    private static function getConfig()
    {
        $env = config('system.is_online') ? 'online' : 'offline';
        return config("system.{$env}");
    }

    public static function createConnection(): AMQPStreamConnection
    {
        $cfg = self::getConfig();
        $conn = new AMQPStreamConnection(
            $cfg['host'],
            $cfg['port'],
            $cfg['name'],
            $cfg['password']
        );
        self::setupQueues($conn);
        return $conn;
    }

    private static function setupQueues(AMQPStreamConnection $connection): void
    {
        $ch = $connection->channel();

        // Khai báo exchange
        $ch->exchange_declare(self::DLX_EXCHANGE, AMQPExchangeType::DIRECT, false, true, false);
        $ch->exchange_declare(self::MAIN_EXCHANGE, AMQPExchangeType::FANOUT, false, true, false);

        // Cấu hình hàng đợi chính với TTL và DLX
        $args = new AMQPTable();
        $args->set('x-message-ttl', 5000); // 5 giây
        $args->set('x-dead-letter-exchange', self::DLX_EXCHANGE);
        $args->set('x-dead-letter-routing-key', self::DLX_ROUTING_KEY);

        $ch->queue_declare(self::MAIN_QUEUE, false, true, false, false, false, $args);
        $ch->queue_declare(self::DLX_QUEUE, false, true, false, false);

        // Gắn kết queue với exchange
        $ch->queue_bind(self::MAIN_QUEUE, self::MAIN_EXCHANGE);
        $ch->queue_bind(self::DLX_QUEUE, self::DLX_EXCHANGE, self::DLX_ROUTING_KEY);

        $ch->close();
    }
}

Nhà sản xuất (Producer)

Gửi tin nhắn vào hàng đợi chính thông qua exchange:

<?php

namespace App\Service;

use PhpAmqpLib\Message\AMQPMessage;

class OrderProducer extends RabbitMqBase
{
    public static function publishOrders(): void
    {
        $conn = self::createConnection();
        $ch = $conn->channel();

        for ($i = 0; $i < 5; $i++) {
            $payload = json_encode([
                'user_id' => random_int(1, 100),
                'amount' => random_int(10000, 99999),
                'order_no' => random_int(100, 999)
            ]);

            $msg = new AMQPMessage($payload);
            $ch->basic_publish($msg, self::MAIN_EXCHANGE);

            echo "[x] Sent at " . date('Y-m-d H:i:s') . ": {$payload}\n";
        }

        $ch->close();
        $conn->close();
    }
}

Người tiêu thụ chính (Main Consumer)

Xử lý tin nhắn từ hàng đợi chính. Nếu xử lý thất bại hoặc chủ động từ chối, tin nhắn sẽ được chuyển sang DLX:

<?php

namespace App\Service;

use PhpAmqpLib\Message\AMQPMessage;

class MainConsumer extends RabbitMqBase
{
    public static function consume(): void
    {
        $conn = self::createConnection();
        $ch = $conn->channel();

        $callback = function (AMQPMessage $msg) {
            echo "[x] Processing: " . $msg->body . " at " . date('Y-m-d H:i:s') . "\n";

            // Giả lập xử lý lâu hơn TTL (5s)
            sleep(10);

            // Từ chối và không đưa lại vào hàng đợi → chuyển sang DLX
            $msg->ack(false); // basic.nack với requeue=false
        };

        $ch->basic_consume(self::MAIN_QUEUE, '', false, false, false, false, $callback);

        while ($ch->is_consuming()) {
            $ch->wait();
        }

        $ch->close();
        $conn->close();
    }
}

Người tiêu thụ DLX (Delay Consumer)

Nghe và xử lý các tin nhắn đã hết hạn hoặc bị từ chối:

<?php

namespace App\Service;

use PhpAmqpLib\Message\AMQPMessage;

class DelayedConsumer extends RabbitMqBase
{
    public static function consumeDelayed(): void
    {
        $conn = self::createConnection();
        $ch = $conn->channel();

        $callback = function (AMQPMessage $msg) {
            echo "[✓] Delayed message processed: " . $msg->body . " at " . date('Y-m-d H:i:s') . "\n";
            $msg->ack(); // Xác nhận hoàn tất
        };

        $ch->basic_consume(self::DLX_QUEUE, '', false, true, false, false, $callback);

        while ($ch->is_consuming()) {
            $ch->wait();
        }

        $ch->close();
        $conn->close();
    }
}

Lưu ý quan trọng

  • Loại exchange: FANOUT broadcast tin nhắn đến mọi queue được bind, trong khi DIRECT yêu cầu routing key khớp chính xác.
  • Dead Letter Conditions: Tin nhắn trở thành dead letter khi:
    1. Bị từ chối với requeue=false,
    2. Hết thời gian sống (TTL),
    3. Hàng đợi đạt giới hạn độ dài.
  • Delay Queue: Không khởi chạy MainConsumer → tin nhắn tự động chuyển sang DLX sau TTL → hiệu ứng delay.
  • Xác nhận thủ công: Sử dụng basic_ack để xác nhận xử lý thành công, basic_nack(..., false) để từ chối vĩnh viễn.
  • Consumer phải chạy trong môi trường CLI, trong khi Producer có thể gọi từ HTTP nếu cần.

Luồng dữ liệu: Producer → main.exchange → main.queue (TTL=5s) → [hết hạn] → dlx.exchange → dlx.queue → DelayedConsumer

Thẻ: php rabbitmq laravel delay-queue dead-letter-exchange

Đăng vào ngày 24 tháng 5 lúc 19:50