Tổng quan vấn đề
Trong quá trình phát triển, đôi khi chúng ta gặp phải tình huống máy chủ Netty có thể nhận tin nhắn từ máy khách nhưng lại gặp khó khăn trong việc gửi dữ liệu trở lại một cách chính xác. Vấn đề này có thể bắt nguồn từ việc xử lý mã hóa (encoding) và giải mã hóa (decoding) dữ liệu. Bài viết này sẽ tập trung vào việc khắc phục sự cố này và cung cấp giải pháp để gửi dữ liệu thập lục phân (hexadecimal) hiệu quả từ máy chủ Netty.
Cấu trúc Code
1. Lớp Khởi tạo Máy chủ (NettyServer)
Đây là lớp chịu trách nhiệm khởi tạo và cấu hình các nhóm luồng (event loop groups) cho máy chủ Netty.
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NettyServer {
private static class SingletonWSServer {
static final NettyServer instance = new NettyServer();
}
public static NettyServer getInstance() {
return SingletonWSServer.instance;
}
private EventLoopGroup bossGroup; // Nhóm luồng chính (chấp nhận kết nối)
private EventLoopGroup workerGroup; // Nhóm luồng phụ (xử lý giao dịch)
private ServerBootstrap serverBootstrap;
private ChannelFuture channelFuture;
public NettyServer() {
// Khởi tạo nhóm luồng chính
bossGroup = new NioEventLoopGroup();
// Khởi tạo nhóm luồng phụ
workerGroup = new NioEventLoopGroup();
// Cấu hình ServerBootstrap
serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup) // Thiết lập nhóm luồng chính và phụ
.option(ChannelOption.SO_BACKLOG, 100) // Kích thước hàng đợi cho kết nối chờ xử lý
.handler(new LoggingHandler(LogLevel.INFO)) // Ghi log thông tin kết nối
.channel(NioServerSocketChannel.class) // Sử dụng kênh NIO cho máy chủ socket
.childHandler(new NettyServerInitializer()); // Bộ xử lý con cho các kết nối đến
}
/**
* Khởi động máy chủ tại cổng được chỉ định.
* @param port Cổng mà máy chủ sẽ lắng nghe.
*/
public void bind(int port) {
try {
this.channelFuture = serverBootstrap.bind(port).sync();
System.out.println("Máy chủ Netty WebSocket đã khởi động thành công...");
log.info("Máy chủ Netty WebSocket đã khởi động thành công...");
// Chờ cho đến khi kênh đóng lại
this.channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
System.err.println("Lỗi khi khởi động máy chủ Netty WebSocket: " + e.getMessage());
log.debug("Lỗi khi khởi động máy chủ Netty WebSocket: {}", e.getMessage());
}
}
/**
* Tắt máy chủ.
*/
public void shutdown() {
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
}
2. Bộ Khởi tạo Kênh Máy chủ (NettyServerInitializer)
Lớp này định cấu hình các ChannelPipeline cho các kết nối đến, bao gồm các bộ giải mã, bộ mã hóa và bộ xử lý tin nhắn tùy chỉnh.
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import lombok.extern.slf4j.Slf4j;
// Giả định các lớp này tồn tại và được import đúng cách
import com.your_package.netty.coder.NettyMessageDecoder;
import com.your_package.netty.coder.NettyMessageEncoder;
import com.your_package.netty.handler.HeartBeatHandler;
import com.your_package.netty.handler.NettyServerHandler;
@Slf4j
public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
log.info("Khởi tạo kênh SocketChannel...");
ChannelPipeline pipeline = ch.pipeline();
// Bộ giải mã tùy chỉnh
pipeline.addLast(new NettyMessageDecoder());
// Bộ mã hóa tùy chỉnh
pipeline.addLast(new NettyMessageEncoder());
// Xử lý phát hiện trạng thái rảnh rỗi (heartbeat)
pipeline.addLast(new HeartBeatHandler());
/**
* Cấu hình LengthFieldBasedFrameDecoder:
* @param maxFrameLength: Kích thước tối đa của khung dữ liệu.
* @param lengthFieldOffset: Độ lệch (offset) của trường độ dài trong khung.
* @param lengthFieldLength: Độ dài (tính bằng byte) của trường độ dài.
* @param lengthAdjustment: Giá trị điều chỉnh cho trường độ dài (có thể âm).
* @param initialBytesToStrip: Số byte cần bỏ qua sau khi đọc trường độ dài.
* @param failFast: Nếu true, sẽ báo lỗi ngay khi vượt quá maxFrameLength.
*/
pipeline.addLast(new LengthFieldBasedFrameDecoder(
1024 * 1024, // maxFrameLength: 1MB
4, // lengthFieldOffset: Bắt đầu từ byte thứ 4
4, // lengthFieldLength: Trường độ dài 4 byte
2, // lengthAdjustment: Điều chỉnh độ dài 2 byte
0 // initialBytesToStrip: Bỏ qua 0 byte ban đầu
));
// Bộ xử lý tùy chỉnh để giải mã và xử lý tin nhắn
pipeline.addLast(new NettyServerHandler());
}
}
Lưu ý: Các tham số cho LengthFieldBasedFrameDecoder cần được cấu hình cẩn thận dựa trên đặc tả giao thức dữ liệu của bạn.
3. Bộ Xử lý Trạng thái Rảnh rỗi (HeartBeatHandler)
Lớp này giám sát các sự kiện trạng thái rảnh rỗi (IdleStateEvent) và có thể đóng các kênh không hoạt động để tiết kiệm tài nguyên.
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
switch (event.state()) {
case READER_IDLE:
System.out.println("Phát hiện trạng thái rảnh rỗi khi đọc...");
break;
case WRITER_IDLE:
System.out.println("Phát hiện trạng thái rảnh rỗi khi ghi...");
break;
case ALL_IDLE:
System.out.println("Phát hiện trạng thái rảnh rỗi (đọc và ghi)...");
Channel channel = ctx.channel();
// Đóng kênh không hoạt động để tiết kiệm tài nguyên
channel.close();
System.out.println("Đã đóng kênh không hoạt động.");
break;
default:
break;
}
}
}
}
4. Bộ Giải mã Tin nhắn (NettyMessageDecoder)
Bộ giải mã này chịu trách nhiệm phân tích các byte nhận được từ máy khách thành các đối tượng tin nhắn có ý nghĩa.
import java.util.List;
// Giả định các lớp này tồn tại và được import đúng cách
import com.your_package.netty.constant.Delimiter;
import com.your_package.netty.pojo.GpsMessage;
import com.your_package.netty.pojo.LoginMsg;
import com.your_package.utils.CrcUtils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
public class NettyMessageDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("Bắt đầu giải mã...");
int readableBytes = in.readableBytes();
if (readableBytes < Delimiter.MINIMUM_LENGTH) {
// Chưa đủ dữ liệu để giải mã, chờ thêm
return;
}
in.markReaderIndex(); // Đánh dấu vị trí đọc hiện tại
GpsMessage gpsMessage = new GpsMessage();
byte packetLengthByte = in.readByte();
int packetLength = packetLengthByte & 0xff; // Chuyển đổi byte thành int không dấu
gpsMessage.setPacketLen(packetLength);
byte messageType = in.readByte();
gpsMessage.setAgreement(messageType);
ByteBuf payload = null;
if (messageType == Delimiter.LOGIN_PACKET) { // Gói tin đăng nhập
LoginMsg loginMsg = new LoginMsg();
// Giả sử CrcUtils.decodeCodeIDFrame đọc một phần dữ liệu từ 'in'
payload = CrcUtils.decodeCodeIDFrame(ctx, in);
String deviceId = CrcUtils.bytesToHexString(payload);
System.out.println("Device ID: " + deviceId);
loginMsg.setCardId(deviceId);
gpsMessage.setContent(loginMsg);
} else if (messageType == Delimiter.STATUS_PACKET) { // Gói tin trạng thái/heartbeat
System.out.println("Nhận gói tin trạng thái...");
payload = CrcUtils.decodeCodeIDFrame(ctx, in); // Giả định đọc phần payload
String statusContent = CrcUtils.bytesToHexString(payload);
System.out.println("Nội dung trạng thái: " + statusContent);
gpsMessage.setContent(statusContent);
}
// Nếu có các loại gói tin khác, thêm các câu lệnh 'else if' tương ứng
out.add(gpsMessage); // Thêm đối tượng tin nhắn đã giải mã vào danh sách
System.out.println("Giải mã hoàn tất!");
}
}
5. Bộ Mã hóa Tin nhắn (NettyMessageEncoder)
Bộ mã hóa này chuyển đổi các đối tượng GpsMessage thành các byte để gửi đến máy khách.
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
// Giả định lớp GpsMessage có cấu trúc phù hợp
// import com.your_package.netty.pojo.GpsMessage;
public class NettyMessageEncoder extends MessageToByteEncoder<GpsMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, GpsMessage message, ByteBuf out) throws Exception {
// 1. Ghi độ dài gói tin (giả định là độ dài của phần nội dung + một số byte overhead)
// Cần điều chỉnh logic này dựa trên định nghĩa chính xác của packetLen
out.writeInt(message.getPacketLen());
// 2. Ghi loại tin nhắn
out.writeByte(message.getAgreement());
// 3. Ghi nội dung tin nhắn
// Cần đảm bảo nội dung được chuyển đổi thành byte một cách chính xác
// Ví dụ: Nếu content là String, bạn có thể cần chuyển nó thành byte[]
// Nếu content là một đối tượng phức tạp, bạn cần có logic serialize phù hợp
if (message.getContent() != null) {
// Giả định phương thức toString() trả về biểu diễn byte mong muốn hoặc bạn cần
// một cách serialize khác (ví dụ: JSON, Protobuf, hoặc byte tùy chỉnh)
String contentString = message.getContent().toString();
out.writeBytes(contentString.getBytes()); // Cần xác định encoding phù hợp
}
}
}
6. Bộ Xử lý Tin nhắn Máy chủ (NettyServerHandler)
Đây là bộ xử lý chính, xử lý các tin nhắn đến, quản lý các kênh và gửi phản hồi (bao gồm cả dữ liệu thập lục phân).
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
// Giả định các lớp này tồn tại và được import đúng cách
// import com.your_package.netty.channel.CardChannelRel;
// import com.your_package.netty.constant.Delimiter;
// import com.your_package.netty.pojo.GpsMessage;
// import com.your_package.netty.pojo.LoginMsg;
// import com.your_package.utils.ConvertCode;
// import com.your_package.utils.CrcUtils;
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
// Quản lý tất cả các kênh kết nối
public static final ChannelGroup clientChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Bộ xử lý tin nhắn nhận được: " + msg);
Channel currentChannel = ctx.channel();
GpsMessage receivedMessage = (GpsMessage) msg;
if (receivedMessage != null) {
byte messageType = receivedMessage.getAgreement();
if (messageType == Delimiter.LOGIN_PACKET) { // Gói tin đăng nhập
LoginMsg loginData = (LoginMsg) receivedMessage.getContent();
String deviceId = loginData.getCardId();
// Lưu trữ mối quan hệ giữa deviceId và channel
CardChannelRel.put(deviceId, currentChannel);
System.out.println("Thiết bị đã đăng nhập: " + deviceId);
// Gửi phản hồi đăng nhập dưới dạng thập lục phân
String loginResponseHex = "0105000100000000"; // Ví dụ: ID, Type, Length, Data
writeHexDataToClient(loginResponseHex, currentChannel, "Phản hồi đăng nhập");
} else if (messageType == Delimiter.STATUS_PACKET) { // Gói tin trạng thái/heartbeat
String heartBeatData = (String) receivedMessage.getContent();
System.out.println("Nhận heartbeat: " + heartBeatData);
// Gửi lại dữ liệu heartbeat dưới dạng thập lục phân (hoặc một phản hồi cụ thể)
// Giả sử bạn muốn gửi lại heartbeat đã được mã hóa hex
String heartbeatResponseHex = ConvertCode.bytesToHexString(heartBeatData.getBytes()); // Cần hàm chuyển đổi byte sang hex string
writeHexDataToClient(heartbeatResponseHex, currentChannel, "Phản hồi heartbeat");
} else {
// Xử lý các loại tin nhắn khác nếu có
System.out.println("Nhận loại tin nhắn không xác định.");
}
}
}
/**
* Gửi dữ liệu thập lục phân đến máy khách.
* @param hexData Chuỗi dữ liệu thập lục phân cần gửi.
* @param targetChannel Kênh của máy khách nhận.
* @param logMarker Nhãn dùng cho log/in.
*/
public void writeHexDataToClient(final String hexData, Channel targetChannel, final String logMarker) {
try {
// Chuyển đổi chuỗi thập lục phân thành mảng byte
byte[] dataBytes = ConvertCode.hexStringToByte(hexData);
ByteBuf buffer = Unpooled.wrappedBuffer(dataBytes); // Tạo ByteBuf từ mảng byte
targetChannel.writeAndFlush(buffer).addListener((ChannelFutureListener) future -> {
StringBuilder logBuilder = new StringBuilder();
if (!StringUtils.isEmpty(logMarker)) {
logBuilder.append("【").append(logMarker).append("】");
}
if (future.isSuccess()) {
System.out.println(logBuilder.toString() + "Gửi dữ liệu thành công: " + hexData);
log.info("{} Gửi dữ liệu thành công: {}", logBuilder.toString(), hexData);
} else {
System.err.println(logBuilder.toString() + "Gửi dữ liệu thất bại: " + hexData);
log.error("{} Gửi dữ liệu thất bại: {}", logBuilder.toString(), hexData, future.cause());
}
});
} catch (Exception e) {
e.printStackTrace();
System.err.println("Lỗi khi gửi dữ liệu thập lục phân: " + e.getMessage());
log.error("Lỗi khi gửi dữ liệu thập lục phân:", e);
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
clientChannels.add(ctx.channel());
System.out.println("Client connected: " + ctx.channel().remoteAddress());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
clientChannels.remove(ctx.channel());
System.out.println("Client disconnected: " + ctx.channel().remoteAddress());
// Có thể xóa mối quan hệ trong CardChannelRel tại đây nếu cần
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.channel().close(); // Đóng kênh khi có lỗi
clientChannels.remove(ctx.channel()); // Loại bỏ khỏi nhóm quản lý
System.err.println("Exception caught: " + cause.getMessage());
}
}
7. Lớp Quản lý Quan hệ Kênh (CardChannelRel)
Lớp này giúp quản lý mối quan hệ giữa một định danh duy nhất (ví dụ: deviceId) và Channel tương ứng của nó.
import io.netty.channel.Channel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Quản lý mối quan hệ giữa ID thiết bị và kênh Netty.
*/
public class CardChannelRel {
// Sử dụng ConcurrentHashMap để đảm bảo an toàn luồng
private static final Map channelMap = new ConcurrentHashMap<>();
/**
* Thêm hoặc cập nhật mối quan hệ giữa ID và kênh.
* @param deviceId ID của thiết bị.
* @param channel Kênh Netty tương ứng.
*/
public static void put(String deviceId, Channel channel) {
channelMap.put(deviceId, channel);
System.out.println("Đã thêm/cập nhật quan hệ: " + deviceId + " -> " + channel.id().asShortText());
}
/**
* Lấy kênh dựa trên ID thiết bị.
* @param deviceId ID của thiết bị.
* @return Channel tương ứng hoặc null nếu không tìm thấy.
*/
public static Channel get(String deviceId) {
return channelMap.get(deviceId);
}
/**
* Xóa mối quan hệ dựa trên ID thiết bị.
* @param deviceId ID của thiết bị.
*/
public static void remove(String deviceId) {
Channel removedChannel = channelMap.remove(deviceId);
if (removedChannel != null) {
System.out.println("Đã xóa quan hệ: " + deviceId + " -> " + removedChannel.id().asShortText());
}
}
/**
* In ra tất cả các mối quan hệ hiện có.
*/
public static void printAllRelations() {
System.out.println("--- Danh sách các mối quan hệ đang hoạt động ---");
if (channelMap.isEmpty()) {
System.out.println("Không có mối quan hệ nào.");
} else {
channelMap.forEach((deviceId, channel) ->
System.out.println("ID: " + deviceId + ", Channel: " + channel.id().asShortText())
);
}
System.out.println("-----------------------------------------------");
}
}
Công cụ Hỗ trợ (Utils)
Các lớp tiện ích này giúp thực hiện các tác vụ như chuyển đổi mã hóa, tính toán CRC, v.v.
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
public class ConvertCode {
/**
* Chuyển đổi một chuỗi thập lục phân thành mảng byte.
* @param hex String chứa dữ liệu thập lục phân.
* @return Mảng byte tương ứng.
*/
public static byte[] hexStringToByte(String hex) {
if (hex == null || hex.length() % 2 != 0) {
return null;
}
// Loại bỏ khoảng trắng nếu có
hex = hex.replaceAll("\\s+", "");
return ByteBufUtil.decodeHexBinary(hex);
}
/**
* Chuyển đổi mảng byte thành chuỗi thập lục phân.
* @param bytes Mảng byte cần chuyển đổi.
* @return Chuỗi thập lục phân (in hoa).
*/
public static String bytesToHexString(byte[] bytes) {
if (bytes == null) {
return "";
}
return ByteBufUtil.hexDump(bytes).toUpperCase();
}
// Có thể thêm các phương thức khác như tính CRC, v.v.
// Ví dụ: public static String calculateCrc16(byte[] data) { ... }
}
// --- Ví dụ về CrcUtils (nếu cần) ---
class CrcUtils {
public static String CRC_16(byte[] bytes) {
int crc = 0xFFFF;
for (byte b : bytes) {
crc ^= (b & 0xFF);
for (int i = 0; i < 8; i++) {
if ((crc & 0x0001) != 0) {
crc >>= 1;
crc ^= 0xA001; // Polynomial for CRC-16/MODBUS
} else {
crc >>= 1;
}
}
}
// Trả về dạng hex string, padding với số 0 nếu cần
return String.format("%04X", crc).toUpperCase();
}
// Giả định phương thức này đọc dữ liệu từ ByteBuf và trả về ByteBuf mới
// Cần triển khai chi tiết dựa trên giao thức của bạn
public static ByteBuf decodeCodeIDFrame(ChannelHandlerContext ctx, ByteBuf buffer) {
// Logic đọc và trả về phần dữ liệu cụ thể (ví dụ: ID thiết bị)
// Ví dụ đơn giản: đọc 8 byte đầu tiên
int readable = buffer.readableBytes();
int bytesToRead = Math.min(8, readable); // Đọc tối đa 8 byte hoặc số byte còn lại
if (bytesToRead <= 0) return null;
ByteBuf payload = buffer.readBytes(bytesToRead);
return payload;
}
}
// --- Ví dụ về Delimiter (nếu cần) ---
class Delimiter {
public static final byte LOGIN_PACKET = 0x01;
public static final byte STATUS_PACKET = 0x02;
public static final int MINIMUM_LENGTH = 2; // Độ dài tối thiểu (ví dụ: 1 byte Packet Length + 1 byte Type)
}
// --- Ví dụ về POJO (nếu cần) ---
class GpsMessage {
private int packetLen;
private byte agreement;
private Object content; // Có thể là String, LoginMsg, hoặc các đối tượng khác
public int getPacketLen() { return packetLen; }
public void setPacketLen(int packetLen) { this.packetLen = packetLen; }
public byte getAgreement() { return agreement; }
public void setAgreement(byte agreement) { this.agreement = agreement; }
public Object getContent() { return content; }
public void setContent(Object content) { this.content = content; }
@Override
public String toString() {
return "GpsMessage{" +
"packetLen=" + packetLen +
", agreement=" + String.format("0x%02X", agreement) +
", content=" + content +
'}';
}
}
class LoginMsg {
private String cardId; // ID của thiết bị
public String getCardId() { return cardId; }
public void setCardId(String cardId) { this.cardId = cardId; }
@Override
public String toString() {
return "LoginMsg{cardId='" + cardId + "'}";
}
}
Hiệu quả
Bằng cách cấu hình chính xác bộ giải mã, bộ mã hóa và bộ xử lý tin nhắn, máy chủ Netty có thể gửi dữ liệu thập lục phân một cách đáng tin cậy đến máy khách.