Xây dựng Hệ thống RPC với RabbitMQ và PHP

Giới thiệu về RPC Trong bài hướng dẫn thứ hai, chúng ta đã học cách sử dụng hàng đợi công việc để phân phối các tác vụ tốn thời gian giữa nhiều worker.

Nhưng nếu chúng ta cần thực thi một hàm trên máy tính từ xa và chờ kết quả thì sao? Đó là một câu chuyện khác. Mô hình này thường được gọi là "gọi thủ tục từ xa" (RPC).

Trong hướng dẫn này, chúng ta sẽ xây dựng một hệ thống RPC sử dụng RabbitMQ: một client và một server RPC có khả năng mở rộng. Vì chúng ta không có tác vụ tốn thời gian đáng để phân phối, chúng ta sẽ tạo một dịch vụ RPC ảo trả về số Fibonacci.

Giao diện Client

Để minh họa cách sử dụng dịch vụ RPC, chúng ta sẽ tạo một lớp client đơn giản. Nó sẽ công khai một phương thức có tên call, gửi yêu cầu RPC và chặn cho đến khi nhận được câu trả lời:

$fibonacci_rpc = new FibonacciRpcClient();
$response = $fibonacci_rpc->call(30);
echo ' [.] Kết nhận được ', $response, "\n";

Lưu ý về RPC Mặc dù RPC là một mô hình rất phổ biến trong tính toán, nó thường bị chỉ trích. Vấn đề phát sinh khi lập trình viên không biết rõ hàm gọi là cục bộ hay là RPC chậm chạp. Sự nhầm lẫn này có thể dẫn đến hệ thống không ổn định và tăng độ phức tạp cho việc gỡ lỗi. Lạm dụng RPC có thể tạo ra mã spaghetti không thể bảo trì, thay vì đơn giản hóa phần mềm.

Lưu ý điều này, hãy xem xét các đề xuất sau:

  • Đảm bảo rõ ràng hàm gọi nào là cục bộ, hàm nào là từ xa.
  • Ghi lại hệ thống của bạn. Xác định rõ mối quan hệ phụ thuộc giữa các thành phần.
  • Xử lý các trường hợp lỗi. Client nên phản ứng như thế nào khi server RPC bị đóng trong thời gian dài?

Nếu có nghi ngờ, hãy tránh sử dụng RPC. Nếu có thể, hãy sử dụng đường ống bất đồng bộ - thay vì chặn kiểu RPC, đẩy kết quả bất đồng bộ đến giai đoạn tính toán tiếp theo.

Hàng đợi Callback

Thông thường, thực hiện RPC qua RabbitMQ khá dễ dàng. Client gửi tin nhắn yêu cầu, server gửi tin nhắn phản hồi. Để nhận phản hồi, chúng ta cần gửi địa chỉ hàng đợi "callback" cùng với yêu cầu. Chúng ta có thể sử dụng hàng đợi mặc định. Hãy thử:

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$msg = new AMQPMessage(
    $payload,
    array('reply_to' => $queue_name)
);

$channel->basic_publish($msg, '', 'rpc_queue');

# ... sau đó là mã để đọc tin nhắn phản hồi từ callback_queue ...

Thuộc tính tin nhắn Giao thức AMQP 0-9-1 đã định nghĩa trước 14 tập thuộc tính đi kèm với tin nhắn. Ngoài các thuộc tính sau, hầu hết các thuộc tính khác ít khi được sử dụng:

  • delivery_mode: Đánh dấu tin nhắn là tin nhắn bền vững (giá trị 2) hoặc nhất thời (1). Bạn có thể nhớ thuộc tính này trong bài hướng dẫn thứ hai.
  • content_type: Mô tả kiểu mime mã hóa. Ví dụ, đối với JSON encoding thường được sử dụng, tốt nhất nên đặt thuộc tính này thành application/json.
  • reply_to: Thường được sử dụng để đặt tên cho hàng đợi callback.
  • correlation_id: Được sử dụng để liên kết phản hồi RPC với yêu cầu.

ID tương quan

Trong phương pháp được giới thiệu ở trên, chúng ta đã đề xuất tạo một hàng đợi callback cho mỗi yêu cầu RPC. Đó là một cách khá kém hiệu quả, nhưng may mắn là có cách tốt hơn - hãy tạo một hàng đợi callback cho mỗi client.

Điều này đặt ra một vấn đề mới: sau khi nhận được phản hồi trong hàng đợi, chúng ta không rõ phản hồi thuộc về yêu cầu nào. Đó là lúc sử dụng thuộc tính correlation_id. Chúng ta sẽ đặt nó thành một giá trị duy nhất cho mỗi yêu cầu. Sau này, khi nhận được tin nhắn trong hàng đợi callback, chúng ta sẽ kiểm tra thuộc tính này và dựa vào nó để khớp phản hồi với yêu cầu. Nếu chúng ta thấy một giá trị correlation_id không xác định, chúng ta có thể yên tâm bỏ qua tin nhắn đó - nó không thuộc về yêu cầu của chúng ta.

Bạn có thể tự hỏi, tại sao chúng ta nên bỏ qua các tin nhắn không xác định trong hàng đợi callback thay vì thất bại do lỗi? Điều này là do có thể xảy ra điều kiện cạnh tranh ở phía server. Mặc dù khả năng này không lớn, server RPC có thể chết sau khi gửi câu trả lời cho chúng ta nhưng trước khi gửi tin nhắn xác nhận yêu cầu. Nếu điều này xảy ra, server RPC được khởi động lại sẽ xử lý lại yêu cầu đó. Đó là lý do tại sao ở phía client, chúng ta phải xử lý đúng các phản hồi trùng lặp, và lý tưởng nhất là RPC nên có tính chất幂 đẳng.

Tóm tắt

Hệ thống RPC của chúng ta sẽ hoạt động như sau:

  • Khi client khởi động, nó sẽ tạo một hàng đợi callback ẩn danh và độc quyền.
  • Đối với yêu cầu RPC, client gửi một tin nhắn có hai thuộc tính: reply_to (đặt thành hàng đợi callback) và correlation_id (đặt thành giá trị duy nhất cho mỗi yêu cầu).
  • Yêu cầu được gửi đến hàng đợi rpc_queue.
  • Worker RPC (còn gọi là server) đang chờ đợi các yêu cầu trên hàng đợi này. Khi có yêu cầu, nó sẽ thực hiện công việc và gửi tin nhắn kết quả trở lại client bằng cách sử dụng hàng đợi trong trường reply_to.
  • Client chờ đợi dữ liệu trên hàng đợi callback. Khi có tin nhắn, nó sẽ kiểm tra thuộc tính correlation_id. Nếu nó khớp với giá trị trong yêu cầu, nó sẽ trả lại phản hồi cho ứng dụng.

Triển khai

Hàm Fibonacci:

function tinh_fibonacci($n)
{
    if ($n == 0) {
        return 0;
    }
    if ($n == 1) {
        return 1;
    }
    return tinh_fibonacci($n-1) + tinh_fibonacci($n-2);
}

Chúng ta khai báo hàm Fibonacci của mình. Nó chỉ giả định đầu vào là số nguyên dương hợp lệ. (Đừng mong đợi phương pháp này hoạt động tốt với số lượng lớn người dùng, nó có thể là cách triển khai đệm chậm nhất).

Mã cho server RPC của chúng ta (rpc_server.php) như sau:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$ket_noi = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$kenh = $ket_noi->channel();

$kenh->queue_declare('rpc_queue', false, false, false, false);

function tinh_fibonacci($n)
{
    if ($n == 0) {
        return 0;
    }
    if ($n == 1) {
        return 1;
    }
    return tinh_fibonacci($n-1) + tinh_fibonacci($n-2);
}

echo " [x] Đang chờ yêu cầu RPC\n";
$callback = function ($yeu_cau) {
    $n = intval($yeu_cau->body);
    echo ' [.] tinh_fibonacci(', $n, ")\n";

    $thong_bao = new AMQPMessage(
        (string) tinh_fibonacci($n),
        array('correlation_id' => $yeu_cau->get('correlation_id'))
    );

    $yeu_cau->delivery_info['kenh']->basic_publish(
        $thong_bao,
        '',
        $yeu_cau->get('reply_to')
    );
    $yeu_cau->delivery_info['kenh']->basic_ack(
        $yeu_cau->delivery_info['delivery_tag']
    );
};

$kenh->basic_qos(null, 1, null);
$kenh->basic_consume('rpc_queue', '', false, false, false, false, $callback);

while ($kenh->is_consuming()) {
    $kenh->wait();
}

$kenh->close();
$ket_noi->close();

Mã server rất đơn giản:

  • Như thường lệ, chúng ta đầu tiên thiết lập kết nối, kênh và khai báo hàng đợi.
  • Chúng ta có thể muốn chạy nhiều tiến trình server. Để phân bổ tải đều đặn trên nhiều server, chúng ta cần đặt giá trị prefetch_count trong $kenh->basic_qos.
  • Chúng ta sử dụng basic_consume để truy cập hàng đợi. Sau đó, chúng ta vào vòng lặp while, trong đó chờ tin nhắn yêu cầu, thực hiện công việc và gửi phản hồi trở lại.

Mã cho client RPC của chúng ta (rpc_client.php):

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class FibonacciRpcClient
{
    private $ket_noi;
    private $kenh;
    private $hang_doi_callback;
    private $phan_hoi;
    private $corr_id;

    public function __construct()
    {
        $this->ket_noi = new AMQPStreamConnection(
            'localhost',
            5672,
            'guest',
            'guest'
        );
        $this->kenh = $this->ket_noi->channel();
        list($this->hang_doi_callback, ,) = $this->kenh->queue_declare(
            "",
            false,
            false,
            true,
            false
        );
        $this->kenh->basic_consume(
            $this->hang_doi_callback,
            '',
            false,
            true,
            false,
            false,
            array(
                $this,
                'xuLyPhanHoi'
            )
        );
    }

    public function xuLyPhanHoi($ph)
    {
        if ($ph->get('correlation_id') == $this->corr_id) {
            $this->phan_hoi = $ph->body;
        }
    }

    public function goi($n)
    {
        $this->phan_hoi = null;
        $this->corr_id = uniqid();

        $thong_bao = new AMQPMessage(
            (string) $n,
            array(
                'correlation_id' => $this->corr_id,
                'reply_to' => $this->hang_doi_callback
            )
        );
        $this->kenh->basic_publish($thong_bao, '', 'rpc_queue');
        while (!$this->phan_hoi) {
            $this->kenh->wait();
        }
        return intval($this->phan_hoi);
    }
}

$fibonacci_rpc = new FibonacciRpcClient();
$ket_qua = $fibonacci_rpc->goi(30);
echo ' [.] Nhận được ', $ket_qua, "\n";

Bây giờ là lúc xem xét mã nguồn đầy đủ của các ví dụ rpc_client.php và rpc_server.php.

Dịch vụ RPC của chúng ta đã sẵn sàng. Chúng ta có thể khởi động server:

php rpc_server.php
# => [x] Đang chờ yêu cầu RPC

Để yêu cầu số Fibonacci, hãy chạy client:

php rpc_client.php
# => [x] Yêu cầu tinh_fibonacci(30)

Thiết kế được giới thiệu ở đây không phải là cách triển khai duy nhất có thể cho dịch vụ RPC, nhưng nó có một số ưu điểm quan trọng:

  • Nếu server RPC quá chậm, chúng ta có thể mở rộng bằng cách chạy một server RPC khác. Hãy thử chạy một rpc_server.php khác trong console mới.
  • Ở phía client, RPC chỉ cần gửi và nhận một tin nhắn. Không cần các cuộc gọi đồng bộ như queue_declare. Do đó, client RPC chỉ cần một chuyến đi mạng để xử lý một yêu cầu RPC duy nhất.

Mã của chúng ta vẫn rất đơn giản và không cố gắng giải quyết các vấn đề phức tạp hơn (nhưng rất quan trọng), chẳng hạn như:

  • Nếu không có server nào đang chạy, client nên phản ứng như thế nào?
  • Client có nên đặt một loại thời gian chờ cho RPC không?
  • Nếu server gặp sự cố và ném ra ngoại lệ, có nên chuyển tiếp cho client không?
  • Ngăn chặn các tin nhắn đến không hợp lệ trước khi xử lý (ví dụ: kiểm tra giới hạn, kiểu dữ liệu).

Thẻ: rabbitmq php RPC Message Queue fibonacci

Đăng vào ngày 21 tháng 6 lúc 02:50