Tổng quan về Message Tracing
Trong hệ thống phân tán sử dụng RocketMQ, khả năng quan sát toàn bộ vòng đời của một tin nhắn là cực kỳ quan trọng. Tính năng Message Tracing (đường đi tin nhắn) cho phép ghi nhận chi tiết trạng thái của tin nhắn tại mọi điểm: từ khi nhà sản xuất (Producer) gửi đi, qua quá trình lưu trữ tại Broker, cho đến khi được tiêu thụ bởi Consumer.
Thông qua dữ liệu này, đội ngũ kỹ thuật có thể nhanh chóng chẩn đoán các sự cố như tin nhắn bị thất lạc, tiêu thụ trùng lặp, hoặc độ trễ cao. Ngoài ra, tính năng này cũng hỗ trợ giám sát chỉ số hiệu suất và lưu trữ nhật ký kiểm toán để tuân thủ các quy định về bảo mật.
Thông số chính của dữ liệu:
- Thời gian gửi và nhận tin nhắn.
- Trạng thái xử lý tại từng node.
- Thông tin định danh (Unique Key) để truy vết.
Cấu hình và Chế độ hoạt động
Để kích hoạt tính năng này trên phía Broker, bạn cần thiết lập thuộc tính traceTopicEnable=true trong tệp cấu hình broker.properties.
brokerClusterName=ProductionCluster
brokerName=broker-main
brokerId=0
deleteWhen=04
fileReservedTime=72
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
storePathRootDir=/var/rocketmq/data
storePathCommitLog=/var/rocketmq/logs
## Kích hoạt tính năng truy vết tin nhắn
traceTopicEnable=true
listenPort=10911
namesrvAddr=192.168.1.100:9876
RocketMQ hỗ trợ hai mô hình triển khai chính để lưu trữ dữ liệu truy vết:
1. Chế độ tiêu chuẩn (Normal Mode)
Trong chế độ này, dữ liệu truy vết được lưu trữ chung trên các Broker phục vụ nghiệp vụ. Việc triển khai đơn giản, không yêu cầu tài nguyên bổ sung. Tuy nhiên, nhược điểm là có thể xảy ra cạnh tranh tài nguyên IO, ảnh hưởng đến throughput của hệ thống trong các kịch bản tải cao.
2. Chế độ cô lập IO (Physical IO Isolation)
Để đảm bảo hiệu suất cao nhất cho nghiệp vụ chính, dữ liệu truy vết được tách ra và lưu trữ trên các Broker chuyên dụng. Điều này loại bỏ việc tranh chấp tài nguyên IO giữa tin nhắn nghiệp vụ và tin nhắn log. Chế độ này khuyến nghị cho các hệ thống có lưu lượng tin nhắn truy vết lớn hoặc yêu cầu độ ổn định cao.
Nguyên lý hoạt động
Quá trình thu thập dữ liệu truy vết dựa trên cơ chế Hook (điểm neo) tích hợp sẵn trong Client SDK.
- Kích hoạt (Hooking): Khi Producer gửi tin hoặc Consumer nhận tin, các sự kiện này sẽ bị chặn bởi các Hook có sẵn.
- Bộ đệm (Buffering): Dữ liệu truy vết không được gửi ngay lập tức để tránh làm chậm luồng chính. Thay vào đó, nó được đưa vào hàng đợi bộ nhớ.
- Gửi bất đồng bộ (Async Sending): Một luồng riêng biệt sẽ đọc dữ liệu từ bộ nhớ đệm và gửi đến một Topic chuyên dụng lưu trữ truy vết (mặc định là
RMQ_SYS_TRACE_TOPIC).
Người dùng có thể truy xuất dữ liệu này thông qua bảng điều khiển quản lý (Console) hoặc API của RocketMQ.
Lựa chọn Topic lưu trữ Truy vết
RocketMQ cung cấp hai tùy chọn để lưu trữ dữ liệu thu thập được:
1. Topic mặc định (System Built-in)
Tên Topic là RMQ_SYS_TRACE_TOPIC. RocketMQ tự động quản lý Topic này. Khi người dùng bật cờ enableMsgTrace(true) trên Client mà không chỉ định Topic cụ thể, dữ liệu sẽ đổ về đây. Cách này phù hợp cho việc demo hoặc kiểm thử nhanh.
2. Topic tùy chỉnh (Custom Topic)
Người dùng có thể tự định nghĩa một Topic riêng (ví dụ: MySystem_Audit_Topic) để chứa dữ liệu truy vết. Cách này giúp kiểm soát tốt hơn về chính sách lưu trữ, phân vùng hoặc tách biệt dữ liệu nhạy cảm.
Ví dụ triển khai mã nguồn với Custom Trace Topic
Dưới đây là ví dụ minh họa việc bật tính năng truy vết và sử dụng Topic tùy chỉnh. Chúng ta sẽ thay đổi bối cảnh từ đơn hàng sang sự kiện người dùng để làm mới mã nguồn.
1. Cấu hình Producer gửi tin kèm truy vết
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class TraceEventProducer {
public static void main(String[] args) throws Exception {
// Khởi tạo Producer với tên nhóm, bật trace=true, và chỉ định topic truy vết riêng
DefaultMQProducer producer = new DefaultMQProducer("Event_Trace_Group", true, "MySystem_Audit_Topic");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
try {
// Tạo tin nhắn sự kiện người dùng đăng nhập
Message eventMsg = new Message(
"UserActivityTopic", // Topic nghiệp vụ
"LoginEvent", // Tag
"UserID_998877", // Key định danh
"{\"timestamp\":\"2023-10-27T10:00:00\", \"status\":\"success\"}".getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Gửi tin nhắn
var result = producer.send(eventMsg);
System.out.printf("Gửi sự kiện thành công: %s%n", result.getSendStatus());
} finally {
producer.shutdown();
}
}
}
2. Cấu hình Consumer nhận tin kèm truy vết
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
public class TraceEventConsumer {
public static void main(String[] args) throws Exception {
// Khởi tạo Consumer, bật trace và sử dụng chung topic truy vết với Producer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Event_Trace_Consumer_Group", true, "MySystem_Audit_Topic");
consumer.setNamesrvAddr("127.0.0.1:9876");
// Đăng ký nhận tin nhắn từ Topic nghiệp vụ
consumer.subscribe("UserActivityTopic", "LoginEvent");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
msgs.forEach(msg -> {
System.out.println("Xử lý sự kiện: " + new String(msg.getBody()));
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.println("Consumer đã sẵn sàng lắng nghe sự kiện...");
}
}
3. Kiểm tra và Xác thực
Để đảm bảo hệ thống hoạt động đúng, bạn cần kiểm tra sự tồn tại của Topic tùy chỉnh trên Broker:
sh mqadmin updateTopic -n 127.0.0.1:9876 -t MySystem_Audit_Topic -c ProductionCluster
Sau khi chạy Producer, dữ liệu truy vết sẽ được ghi vào MySystem_Audit_Topic. Bạn có thể truy vấn lại thông qua lệnh:
sh mqadmin queryMsgByOffset -n 127.0.0.1:9876 -t MySystem_Audit_Topic -b 192.168.1.100:10911 -i 0 -o 0