Tổng Quan Về Message Queue Và RabbitMQ
Message Queue (Hàng đợi tin nhắn) hay còn gọi là MQ, là phương thức giao tiếp giữa các ứng dụng dựa trên mô hình Producer-Consumer. Một bên tạo ra và đẩy dữ liệu vào hàng đợi, trong khi bên kia trích xuất và xử lý thông tin đó. RabbitMQ là một trong những giải pháp triển khai MQ phổ biến nhất, hỗ trợ giao thức AMQP.
Các Khái Niệm Cốt Lõi
- Queue (Hàng đợi): Nơi lưu trữ tin nhắn tạm thời cho đến khi người tiêu dùng sẵn sàng nhận chúng.
- Producer (Người sản xuất): Ứng dụng gửi dữ liệu vào hệ thống.
- Consumer (Người tiêu dùng): Ứng dụng lấy dữ liệu từ hàng đợi để xử lý.
Nhiều Consumer có thể cùng lắng nghe một Queue. RabbitMQ sẽ phân phối tin nhắn theo cơ chế cân bằng tải (round-robin), đảm bảo mỗi Consumer nhận được một lượng công việc tương đương nhau.
Mô Hình Exchange Và Binding
Khác với suy nghĩ thông thường, Producer không gửi trực tiếp vào Queue. Thay vào đó, nó chuyển đến Exchange. Exchange chịu trách nhiệm định tuyến thông báo tới các Queue phù hợp thông qua quy tắc Binding.
Có ba kiểu Exchange chính:
- Fanout: Chuyển phát tất cả thông báo nhận được tới mọi Queue đã liên kết với nó (tương tự Broadcast).
- Direct: Chuyển phát thông báo chỉ khi Binding key khớp chính xác với Routing key.
- Topic: Định tuyến dựa trên mẫu (pattern matching). Sử dụng ký hiệu
*cho một từ khóa và#cho một hoặc nhiều từ khóa.
Xây Dựng Môi Trường Spring Boot Kết Nối RabbitMQ
Dưới đây là hướng dẫn thiết lập cấu trúc dự án sử dụng Spring Boot để làm việc với RabbitMQ.
1. Cấu Hình Phụ Thuộc (Maven)
Cập nhật file pom.xml để bao gồm các starter cần thiết cho Web, AMQP và Template Engine.
<dependencies>
<!-- Hỗ trợ RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- Hỗ trợ Web MVC -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Hỗ trợ FreeMarker nếu cần hiển thị view -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-freemarker</artifactId>
</dependency>
</dependencies>
2. Cấu Hình Kết Nối (Application Properties)
Trong file application.properties hoặc application.yml, khai báo thông tin server RabbitMQ.
server.port=8080
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
# Cấu hình Listener Container
spring.rabbitmq.listener.simple.concurrency=3
spring.rabbitmq.listener.simple.max-concurrency=10
spring.rabbitmq.listener.simple.acknowledge-mode=manual
3. Cấu Hình Bean Bằng Java
Tạo lớp cấu hình riêng biệt để quản lý ConnectionFactory và các Component của RabbitMQ một cách linh hoạt hơn.
package com.techguide.demo.config;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleRabbitListenerContainerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfiguration {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setPort(5672);
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
return new RabbitTemplate(factory);
}
@Bean
public SimpleRabbitListenerContainerFactory containerFactory(ConnectionFactory factory) {
SimpleRabbitListenerContainerFactory factoryBean = new SimpleRabbitListenerContainerFactory();
factoryBean.setConnectionFactory(factory);
// Thiết lập số luồng xử lý tối thiểu và tối đa
factoryBean.setConcurrentConsumers(3);
factoryBean.setMaxConcurrentConsumers(10);
factoryBean.setAcknowledgeMode(AcknowledgeMode.MANUAL); // Xác nhận thủ công
return factoryBean;
}
}
4. Định Nghĩa Hàng Đợi Và Exchange
Sử dụng các Bean để khởi tạo các thành phần mạng.
Mô Hình Producer-Consumer Đơn Giản
@Configuration
public class BasicQueueConfig {
@Bean
public Queue taskQueue() {
return new Queue("process-task-queue", true);
}
}
Mô Hình Publish/Subscribe (Fanout)
@Configuration
public class FanoutConfig {
@Bean
public Queue logQueueA() { return new Queue("log-queue-a"); }
@Bean
public Queue logQueueB() { return new Queue("log-queue-b"); }
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("system-events");
}
@Bean
public Binding bindLogA(FanoutExchange ex, Queue qA) {
return BindingBuilder.bind(qA).to(ex);
}
@Bean
public Binding bindLogB(FanoutExchange ex, Queue qB) {
return BindingBuilder.bind(qB).to(ex);
}
}
Mô Hình Topic Exchange
@Configuration
public class TopicConfig {
@Bean
public TopicExchange newsExchange() {
return new TopicExchange("news-topic");
}
@Bean
public Queue techNewsQueue() { return new Queue("tech-news"); }
@Bean
public Queue allNewsQueue() { return new Queue("all-news"); }
@Bean
public Binding bindTech(TopicExchange ex, Queue q) {
// Chỉ nhận tin thuộc nhóm technology
return BindingBuilder.bind(q).to(ex).with("technology.*");
}
@Bean
public Binding bindAll(TopicExchange ex, Queue q) {
// Nhận tất cả tin tức
return BindingBuilder.bind(q).to(ex).with("#");
}
}
Xử Lý Tin Nhắn (Sender & Receiver)
Đối Tượng Tin Nhắn
Đảm bảo các đối tượng truyền tải phải implement Serializable để thực hiện Serialization đúng cách.
package com.techguide.demo.model;
import java.io.Serializable;
public class EventData implements Serializable {
private static final long serialVersionUID = 1L;
private String eventId;
private String description;
private double priority;
public EventData(String id, String desc, double pri) {
this.eventId = id;
this.description = desc;
this.priority = pri;
}
// Getters and Setters
}
Phía Gửi Tin (Producer)
Sử dụng RabbitTemplate để phát tán tin nhắn đến Exchange cụ thể kèm theo Routing Key.
package com.techguide.demo.service;
import com.techguide.demo.model.EventData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
@Service
public class NotificationService {
private final RabbitTemplate template;
public NotificationService(RabbitTemplate template) {
this.template = template;
}
public void sendToDirect(String routingKey, EventData data) {
template.convertAndSend("direct-exchange-name", routingKey, data);
}
public void sendToTopic(String routePattern, EventData data) {
template.convertAndSend("topic-exchange-name", routePattern, data);
}
}
Phía Nhận Tin (Consumer)
Sử dụng annotation @RabbitListener để kích hoạt quá trình lắng nghe. Khi cấu hình Acknowledge Mode là MANUAL, bắt buộc phải gửi tín hiệu xác nhận thủ công.
package com.techguide.demo.listener;
import com.rabbitmq.client.Channel;
import com.techguide.demo.model.EventData;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "process-task-queue")
public class TaskProcessor {
@RabbitHandler
public void handleTask(EventData event, Channel channel, Message msg) throws Exception {
try {
System.out.println("Đã nhận nhiệm vụ ID: " + event.getEventId());
// Logic xử lý nghiệp vụ tại đây
// Đánh dấu tin nhắn đã xử lý thành công
long deliveryTag = msg.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
// Nếu xảy ra lỗi, trả lại hàng đợi hoặc hủy bỏ
long deliveryTag = msg.getMessageProperties().getDeliveryTag();
channel.basicNack(deliveryTag, false, true); // Requeue = true
throw e;
}
}
}
Với cấu trúc này, hệ thống đảm bảo tính ổn định khi xử lý thông tin bất đồng bộ, đồng thời linh hoạt trong việc điều phối dữ liệu giữa các thành phần ứng dụng.