Tác dụng của Message Queue
Giải phóng liên kết ứng dụng
Giảm sự phụ thuộc giữa các hệ thống, khi một ứng dụng gặp lỗi sẽ không ảnh hưởng đến ứng dụng khác, nâng cao độ ổn định hệ thống.
Điều tiết lưu lượng
Trong giờ cao điểm, ngăn chặn việc tất cả yêu cầu đồng thời truy cập cơ sở dữ liệu. Hàng đợi đóng vai trò bộ đệm, giúp yêu cầu được xử lý ổn định, có thể làm chậm phản hồi nhưng tránh được sự cố hệ thống.
Xử lý bất đồng bộ
Cân bằng tốc độ xử lý không đồng đều giữa các dịch vụ, xử lý tác vụ theo cơ chế không đồng bộ.
Khái niệm cơ bản
Broker: Ứng dụng tiếp nhận và phân phối tin nhắn
Virtual host: Phân nhóm ảo cho các thành phần AMQP
Connection: Kết nối TCP giữa producer/consumer và broker
Channel: Kết nối logic trong Connection, giảm chi phí thiết lập TCP
Exchange: Điểm tiếp nhận đầu tiên, định tuyến tin nhắn tới hàng đợi
Queue: Nơi lưu trữ tin nhắn chờ xử lý
Binding: Liên kết ảo giữa exchange và queue
Xác nhận tiêu thụ tin nhắn
Xác nhận tự động
channel.consume("ten_hang_doi", auto_ack=True)
Rủi ro mất tin nhắn nếu kết nối đóng trước khi xử lý
Xác nhận thủ công
# Xác nhận thành công
channel.basic_ack(delivery_tag=msg_id)
# Từ chối tin nhắn
channel.basic_nack(delivery_tag=msg_id, requeue=False)
channel.basic_reject(delivery_tag=msg_id, requeue=True)
Kiểm soát luồng với QOS
channel.basic_qos(prefetch_count=5)
Giới hạn số lượng tin nhắn chưa xác nhận
Duy trì dữ liệu
Exchange bền vững
channel.exchange_declare("exchange_test", durable=True)
Hàng đợi bền vững
channel.queue_declare(queue="hang_doi_test", durable=True)
Tin nhắn bền vững
properties = pika.BasicProperties(delivery_mode=2)
channel.basic_publish(exchange='', routing_key='test', body=tin_nhan, properties=properties)
Xác nhận nhà sản xuất
channel.confirm_delivery()
Broker gửi xác nhận sau khi tin nhắn được ghi vào hàng đợi
Loại Exchange
Fanout
Phát tin nhắn tới tất cả hàng đợi liên kết
Direct
Định tuyến dựa trên routing key chính xác
Topic
Định tuyến linh hoạt sử dụng ký tự đại diện (*, #)
Hàng đợi tin nhắn chết
Nguồn gốc: Tin nhắn hết hạn, hàng đợi đầy, bị từ chối
thuoc_tinh = {"x-dead-letter-exchange": "exchange_chet", "x-dead-letter-routing-key": "key_chet"}
channel.queue_declare("hang_doi_chinh", arguments=thuoc_tinh)
Hàng đợi trì hoãn
Thiết lập TTL cho tin nhắn:
thuoc_tinh = pika.BasicProperties(expiration="30000")
channel.basic_publish(exchange='', routing_key='test', body=tin_nhan, properties=thuoc_tinh)
Thiết lập TTL cho hàng đợi:
thuoc_tinh = {"x-message-ttl": 60000}
channel.queue_declare("hang_doi_cho", arguments=thuoc_tinh)
Kết hợp TTL và hàng đợi tin nhắn chết để thực hiện xử lý trì hoãn