Thiết lập RocketMQ bằng Docker
- Tạo thư mục dữ liệu cho NameServer
mkdir -p /du_lieu/rocketmq/namesrv/logs
mkdir -p /du_lieu/rocketmq/namesrv/store
- Tìm kiếm các image RocketMQ có sẵn
docker search rocketmq
- Lựa chọn một image được đánh giá cao, ví dụ: rocketmqinc/rocketmq
docker pull rocketmqinc/rocketmq
- Khởi động dịch vụ NameServer
docker run -d -p 9876:9876 -v /du_lieu/rocketmq/namesrv/logs:/home/logs -v /du_lieu/rocketmq/namesrv/store:/home/store --name mqnamesrv -e "MAX_HEAP_SIZE=100000000" docker.io/rocketmqinc/rocketmq sh mqnamesrv
- Cài đặt Broker
Tạo thư mục chứa cấu hình của Broker
mkdir -p /du_lieu/rocketmq/broker/logs
mkdir -p /du_lieu/rocketmq/broker/store
mkdir -p /du_lieu/rocketmq/broker/conf/
- Thêm file cấu hình cho Broker tại đường dẫn /du_lieu/rocketmq/broker/conf/broker.conf:
brokerClusterName = ClusterDefault
brokerName = broker-x
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# Địa chỉ IP của máy chủ
brokerIP1 = 192.168.88.50
- Chạy container Broker
docker run -d -p 10911:10911 -p 10909:10909 -v /du_lieu/rocketmq/broker/logs:/home/logs -v /du_lieu/rocketmq/broker/store:/home/store -v /du_lieu/rocketmq/broker/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf --name mqbroker --link mqnamesrv:mqnamesrv -e "NAMESRV_ADDR=192.168.88.50:9876" -e "MAX_HEAP_SIZE=200000000" rocketmqinc/rocketmq:4.4.0 sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
- Cài đặt bảng điều khiển
docker search rocketmq-console
Lựa chọn styletang/rocketmq-console-ng
docker pull styletang/rocketmq-console-ng
- Khởi chạy bảng điều khiển
docker run -d -p 8080:8080 -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=192.168.88.50:9876 -Drocketmq.config.isVIPChannel=false" -t styletang/rocketmq-console-ng
Truy cập bảng điều khiển qua trình duyệt
http://192.168.88.50:8080
Cấu hình Spring Boot với Log4j2
-
Thêm chủ đề mới trong RocketMQ platform tên là Log4j2ToRocketMq
-
Cấu hình log4j2.xml trong dự án Spring Boot
Vì log4j2 chưa hỗ trợ RocketMQ, chúng ta sẽ mở rộng AbstractAppender để tạo RocketMqAppender riêng.
Phần Maven dependencies:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
Thực hiện RocketMqAppender:
package com.doan.project.loggers;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.appender.AppenderLoggingException;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
import org.apache.logging.log4j.core.config.plugins.PluginElement;
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
import org.apache.logging.log4j.core.layout.PatternLayout;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.io.Serializable;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@Plugin(name = "CustomRocketMqAppender", category = "Core", elementType = "appender", printObject = true)
public final class CustomRocketMqAppender extends AbstractAppender implements Serializable {
private static final long serialVersionUID = 1L;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private static DefaultMQProducer producer;
private static String rmqServer;
private static String rmqGroup;
private static String rmqTopic;
private static String rmqTag;
private CustomRocketMqAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout, final boolean ignoreExceptions, String server, String group, String topic, String tag) {
super(name, filter, layout, ignoreExceptions);
initRmq(server, group, topic, tag);
}
private void initRmq(String server, String group, String topic, String tag) {
if (producer == null) {
rmqServer = server;
rmqGroup = group;
rmqTopic = topic;
rmqTag = tag;
producer = new DefaultMQProducer(rmqGroup);
producer.setNamesrvAddr(rmqServer);
try {
producer.shutdown();
producer.start();
} catch (Exception e) {
System.out.println(e);
}
}
}
@Override
public void append(LogEvent event) {
readLock.lock();
try {
byte[] data = getLayout().toByteArray(event);
if (producer != null) {
Message msg = new Message(rmqTopic, rmqTag, data);
producer.sendOneway(msg);
}
} catch (Exception ex) {
throw new AppenderLoggingException(ex);
} finally {
readLock.unlock();
}
}
@PluginFactory
public static CustomRocketMqAppender createAppender(@PluginAttribute("name") String name,
@PluginAttribute("server") String server,
@PluginAttribute("group") String group,
@PluginAttribute("topic") String topic,
@PluginAttribute("tag") String tag,
@PluginElement("Filter") final Filter filter,
@PluginElement("Layout") Layout<? extends Serializable> layout,
@PluginAttribute("ignoreExceptions") boolean ignoreExceptions) {
if (name == null || server == null || group == null || topic == null) {
return null;
}
if (layout == null) {
layout = PatternLayout.createDefaultLayout();
}
return new CustomRocketMqAppender(name, filter, layout, ignoreExceptions, server, group, topic, tag);
}
}
File cấu hình log4j2.xml:
<configuration status="WARN" monitorInterval="30">
<Properties>
<Property name="log_format">[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%p] [%t] - %l - %m%n</Property>
<Property name="log_dir">/tmp/log_files</Property>
<Property name="max_size">30MB</Property>
<Property name="backup_folder">${date:yyyy-MM}</Property>
<Property name="backup_suffix">-%d{yyyy-MM-dd}-%i.log</Property>
</Properties>
<appenders>
<console name="Console" target="SYSTEM_OUT">
<ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
<PatternLayout pattern="${log_format}"/>
</console>
<RollingFile name="InfoLog" fileName="${log_dir}/info.log" filePattern="${log_dir}/${backup_folder}/info${backup_suffix}">
<ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
<PatternLayout pattern="${log_format}"/>
<Policies>
<TimeBasedTriggeringPolicy/>
<SizeBasedTriggeringPolicy size="${max_size}"/>
</Policies>
<DefaultRolloverStrategy max="20"/>
</RollingFile>
<CustomRocketMqAppender name="RmqAppender" server="192.168.88.50:9876" group="SpringBootGroup" topic="Log4j2ToRocketMq" tag="boot-app">
<PatternLayout pattern="${log_format}"/>
</CustomRocketMqAppender>
</appenders>
<loggers>
<logger name="org.springframework" level="INFO"/>
<root level="INFO">
<appender-ref ref="Console"/>
<appender-ref ref="InfoLog"/>
<appender-ref ref="RmqAppender"/>
</root>
</loggers>
</configuration>
Xử lý tin nhắn từ Queue
package com.doan.project.consumer;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(consumerGroup = "ConsumerGroup", topic = "Log4j2ToRocketMq")
public class RmqConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
// Xử lý thông điệp nhận được
}
}