Cài đặt RocketMQ trên Docker và cấu hình log4j2 để thu thập nhật ký

Thiết lập RocketMQ bằng Docker

  1. Tạo thư mục dữ liệu cho NameServer
mkdir -p /du_lieu/rocketmq/namesrv/logs
mkdir -p /du_lieu/rocketmq/namesrv/store
  1. Tìm kiếm các image RocketMQ có sẵn
docker search rocketmq
  1. Lựa chọn một image được đánh giá cao, ví dụ: rocketmqinc/rocketmq
docker pull rocketmqinc/rocketmq
  1. Khởi động dịch vụ NameServer
docker run -d -p 9876:9876 -v /du_lieu/rocketmq/namesrv/logs:/home/logs -v /du_lieu/rocketmq/namesrv/store:/home/store --name mqnamesrv -e "MAX_HEAP_SIZE=100000000" docker.io/rocketmqinc/rocketmq sh mqnamesrv
  1. Cài đặt Broker

Tạo thư mục chứa cấu hình của Broker

mkdir -p /du_lieu/rocketmq/broker/logs
mkdir -p /du_lieu/rocketmq/broker/store
mkdir -p /du_lieu/rocketmq/broker/conf/
  1. Thêm file cấu hình cho Broker tại đường dẫn /du_lieu/rocketmq/broker/conf/broker.conf:
brokerClusterName = ClusterDefault
brokerName = broker-x
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# Địa chỉ IP của máy chủ
brokerIP1 = 192.168.88.50
  1. Chạy container Broker
docker run -d -p 10911:10911 -p 10909:10909 -v /du_lieu/rocketmq/broker/logs:/home/logs -v /du_lieu/rocketmq/broker/store:/home/store -v /du_lieu/rocketmq/broker/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf --name mqbroker --link mqnamesrv:mqnamesrv -e "NAMESRV_ADDR=192.168.88.50:9876" -e "MAX_HEAP_SIZE=200000000" rocketmqinc/rocketmq:4.4.0 sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
  1. Cài đặt bảng điều khiển
docker search rocketmq-console

Lựa chọn styletang/rocketmq-console-ng

docker pull styletang/rocketmq-console-ng
  1. Khởi chạy bảng điều khiển
docker run -d -p 8080:8080 -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=192.168.88.50:9876 -Drocketmq.config.isVIPChannel=false" -t styletang/rocketmq-console-ng

Truy cập bảng điều khiển qua trình duyệt

http://192.168.88.50:8080

Cấu hình Spring Boot với Log4j2

  1. Thêm chủ đề mới trong RocketMQ platform tên là Log4j2ToRocketMq

  2. Cấu hình log4j2.xml trong dự án Spring Boot

Vì log4j2 chưa hỗ trợ RocketMQ, chúng ta sẽ mở rộng AbstractAppender để tạo RocketMqAppender riêng.

Phần Maven dependencies:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.4</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>

Thực hiện RocketMqAppender:

package com.doan.project.loggers;

import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.appender.AppenderLoggingException;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
import org.apache.logging.log4j.core.config.plugins.PluginElement;
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
import org.apache.logging.log4j.core.layout.PatternLayout;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

import java.io.Serializable;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

@Plugin(name = "CustomRocketMqAppender", category = "Core", elementType = "appender", printObject = true)
public final class CustomRocketMqAppender extends AbstractAppender implements Serializable {
    private static final long serialVersionUID = 1L;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final Lock readLock = lock.readLock();
    private static DefaultMQProducer producer;
    private static String rmqServer;
    private static String rmqGroup;
    private static String rmqTopic;
    private static String rmqTag;

    private CustomRocketMqAppender(final String name, final Filter filter, final Layout<? extends Serializable> layout, final boolean ignoreExceptions, String server, String group, String topic, String tag) {
        super(name, filter, layout, ignoreExceptions);
        initRmq(server, group, topic, tag);
    }

    private void initRmq(String server, String group, String topic, String tag) {
        if (producer == null) {
            rmqServer = server;
            rmqGroup = group;
            rmqTopic = topic;
            rmqTag = tag;
            producer = new DefaultMQProducer(rmqGroup);
            producer.setNamesrvAddr(rmqServer);
            try {
                producer.shutdown();
                producer.start();
            } catch (Exception e) {
                System.out.println(e);
            }
        }
    }

    @Override
    public void append(LogEvent event) {
        readLock.lock();
        try {
            byte[] data = getLayout().toByteArray(event);
            if (producer != null) {
                Message msg = new Message(rmqTopic, rmqTag, data);
                producer.sendOneway(msg);
            }
        } catch (Exception ex) {
            throw new AppenderLoggingException(ex);
        } finally {
            readLock.unlock();
        }
    }

    @PluginFactory
    public static CustomRocketMqAppender createAppender(@PluginAttribute("name") String name,
                                                        @PluginAttribute("server") String server,
                                                        @PluginAttribute("group") String group,
                                                        @PluginAttribute("topic") String topic,
                                                        @PluginAttribute("tag") String tag,
                                                        @PluginElement("Filter") final Filter filter,
                                                        @PluginElement("Layout") Layout<? extends Serializable> layout,
                                                        @PluginAttribute("ignoreExceptions") boolean ignoreExceptions) {
        if (name == null || server == null || group == null || topic == null) {
            return null;
        }
        if (layout == null) {
            layout = PatternLayout.createDefaultLayout();
        }
        return new CustomRocketMqAppender(name, filter, layout, ignoreExceptions, server, group, topic, tag);
    }
}

File cấu hình log4j2.xml:

<configuration status="WARN" monitorInterval="30">
    <Properties>
        <Property name="log_format">[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%p] [%t] - %l - %m%n</Property>
        <Property name="log_dir">/tmp/log_files</Property>
        <Property name="max_size">30MB</Property>
        <Property name="backup_folder">${date:yyyy-MM}</Property>
        <Property name="backup_suffix">-%d{yyyy-MM-dd}-%i.log</Property>
    </Properties>
    <appenders>
        <console name="Console" target="SYSTEM_OUT">
            <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
            <PatternLayout pattern="${log_format}"/>
        </console>
        <RollingFile name="InfoLog" fileName="${log_dir}/info.log" filePattern="${log_dir}/${backup_folder}/info${backup_suffix}">
            <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
            <PatternLayout pattern="${log_format}"/>
            <Policies>
                <TimeBasedTriggeringPolicy/>
                <SizeBasedTriggeringPolicy size="${max_size}"/>
            </Policies>
            <DefaultRolloverStrategy max="20"/>
        </RollingFile>
        <CustomRocketMqAppender name="RmqAppender" server="192.168.88.50:9876" group="SpringBootGroup" topic="Log4j2ToRocketMq" tag="boot-app">
            <PatternLayout pattern="${log_format}"/>
        </CustomRocketMqAppender>
    </appenders>
    <loggers>
        <logger name="org.springframework" level="INFO"/>
        <root level="INFO">
            <appender-ref ref="Console"/>
            <appender-ref ref="InfoLog"/>
            <appender-ref ref="RmqAppender"/>
        </root>
    </loggers>
</configuration>

Xử lý tin nhắn từ Queue

package com.doan.project.consumer;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(consumerGroup = "ConsumerGroup", topic = "Log4j2ToRocketMq")
public class RmqConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        // Xử lý thông điệp nhận được
    }
}

Thẻ: RocketMQ docker log4j2 SpringBoot

Đăng vào ngày 7 tháng 6 lúc 21:10