Phân tích cơ chế heartbeat và kết nối lại tự động trong Netty
- Giới thiệu cơ bản
Khái niệm heartbeat
Như tên gọi, heartbeat (nhịp tim) là một loại gói dữ liệu đặc biệt được gửi định kỳ giữa client và server trong các kết nối TCP dài hạn, nhằm thông báo cho đối phương rằng mình vẫn đang kết nối và đảm bảo tính hiệu quả của kết nối TCP.
Tại sao cần cơ chế heartbeat
Do tính không đáng tin cậy của mạng, trong quá trình duy trì kết nối TCP dài, có thể xảy ra các tình huống bất ngờ như cáp mạng bị rút đột ngột, mất điện đột ngột, gây gián đoạn kết nối giữa server và client. Trong những tình huống này, nếu không có tương tác giữa server và client, chúng không thể phát hiện nhanh chóng rằng đối phương đã mất kết nối. Để giải quyết vấn đề này, chúng ta cần giới thiệu cơ chế heartbeat. Nguyên lý hoạt động của cơ chế heartbeat là: khi không có tương tác dữ liệu giữa server và client trong một khoảng thời gian nhất định (trạng thái idle), client hoặc server sẽ gửi một gói dữ liệu đặc biệt đến đối phương. Khi bên nhận nhận được gói tin này, ngay lập tức gửi lại một gói tin đặc biệt cho bên gửi, đây là một tương tác PING-PONG. Tự nhiên, khi một bên nhận được tin nhắn heartbeat, họ biết rằng đối phương vẫn đang kết nối, đảm bảo tính hiệu quả của kết nối TCP.
Cách triển khai heartbeat
Chúng ta có thể triển khai cơ chế heartbeat bằng hai cách:
- Sử dụng cơ chế keepalive ở tầng TCP.
- Triển khai cơ chế heartbeat tùy chỉnh ở tầng ứng dụng.
Mặc dù ở tầng TCP đã cung cấp cơ chế keepalive, nhưng sử dụng nó có một số nhược điểm:
- Nó không phải là tiêu chuẩn của TCP và mặc định bị tắt.
- Cơ chế keepalive TCP phụ thuộc vào việc triển khai của hệ điều hành, thời gian heartbeat mặc định là 2 giờ, và việc sửa đổi keepalive cần gọi hệ thống (hoặc sửa đổi cấu hình hệ thống), không đủ linh hoạt.
- Keepalive TCP gắn với giao thức TCP, do đó nếu cần chuyển sang giao thức UDP, cơ chế keepalive sẽ không còn hiệu quả.
Mặc dù sử dụng cơ chế keepalive ở tầng TCP tiết kiệm lưu lượng hơn cơ chế heartbeat tùy chỉnh ở tầng ứng dụng, nhưng dựa trên các nhược điểm trên, trong thực tế, hầu hết mọi người đều chọn triển khai cơ chế heartbeat tùy chỉnh ở tầng ứng dụng.
Vậy thì, chúng ta hãy xem xét cách triển khai heartbeat trong Netty. Chìa khóa để triển khai heartbeat trong Netty là IdleStateHandler, nó có thể thiết lập bộ định thời cho việc đọc/ghi của một Channel, khi Channel không có tương tác dữ liệu trong một khoảng thời gian nhất định (trạng thái idle), nó sẽ kích hoạt sự kiện được chỉ định.
- Sử dụng Netty để triển khai heartbeat
Như đã đề cập ở trên, chìa khóa để triển khai heartbeat trong Netty là IdleStateHandler, vậy làm thế nào để sử dụng Handler này? Hãy xem xét constructor của nó:
public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
if (readerIdleTime <= 0) {
this.readerIdleTime = 0;
} else {
this.readerIdleTime = unit.toNanos(readerIdleTime);
}
if (writerIdleTime <= 0) {
this.writerIdleTime = 0;
} else {
this.writerIdleTime = unit.toNanos(writerIdleTime);
}
if (allIdleTime <= 0) {
this.allIdleTime = 0;
} else {
this.allIdleTime = unit.toNanos(allIdleTime);
}
}
Để khởi tạo một IdleStateHandler, chúng ta cần cung cấp ba tham số:
- readerIdleTime: thời gian đọc hết hạn. Khi không có dữ liệu được đọc từ Channel trong khoảng thời gian chỉ định, sẽ kích hoạt sự kiện IdleStateEvent loại READER_IDLE.
- writerIdleTime: thời gian ghi hết hạn. Khi không có dữ liệu được ghi vào Channel trong khoảng thời gian chỉ định, sẽ kích hoạt sự kiện IdleStateEvent loại WRITER_IDLE.
- allIdleTime: thời gian đọc/ghi hết hạn. Khi không có hoạt động đọc hoặc ghi nào trong khoảng thời gian chỉ định, sẽ kích hoạt sự kiện IdleStateEvent loại ALL_IDLE.
Để minh họa cơ chế heartbeat được triển khai bởi IdleStateHandler, chúng ta sẽ xây dựng một ví dụ cụ thể về EchoServer với hành vi sau:
- Trong ví dụ này, client và server giao tiếp thông qua kết nối TCP dài.
- Định dạng giao tiếp TCP là:
+--------+-----+---------------+
| Length |Type | Content |
| 17 | 1 |"XIN CHAO" |
+--------+-----+---------------+
- Client gửi tin nhắn đến server sau một khoảng thời gian ngẫu nhiên, server nhận được tin nhắn sẽ ngay lập tức trả lại nguyên vẹn tin nhắn đó cho client.
- Nếu client không có hoạt động đọc/ghi trong khoảng thời gian chỉ định, client sẽ tự động gửi tin nhắn heartbeat PING đến server, khi server nhận được tin nhắn heartbeat PING, cần trả lời một tin nhắn PONG.
Phần chung
Dựa trên hành vi được định nghĩa ở trên, chúng ta sẽ triển khai phần chung CustomHeartbeatHandler:
public abstract class CustomHeartbeatHandler extends SimpleChannelInboundHandler<ByteBuf> {
public static final byte PING_PACKET = 1;
public static final byte PONG_PACKET = 2;
public static final byte DATA_PACKET = 3;
protected String identifier;
private int heartbeatCounter = 0;
public CustomHeartbeatHandler(String identifier) {
this.identifier = identifier;
}
@Override
protected void channelRead0(ChannelHandlerContext context, ByteBuf byteBuf) throws Exception {
if (byteBuf.getByte(4) == PING_PACKET) {
respondWithPong(context);
} else if (byteBuf.getByte(4) == PONG_PACKET){
System.out.println(identifier + " nhận được gói PONG từ " + context.channel().remoteAddress());
} else {
processMessage(context, byteBuf);
}
}
protected void sendPingPacket(ChannelHandlerContext context) {
ByteBuf buffer = context.alloc().buffer(5);
buffer.writeInt(5);
buffer.writeByte(PING_PACKET);
context.writeAndFlush(buffer);
heartbeatCounter++;
System.out.println(identifier + " đã gửi gói PING đến " + context.channel().remoteAddress() + ", tổng số: " + heartbeatCounter);
}
private void respondWithPong(ChannelHandlerContext context) {
ByteBuf buffer = context.alloc().buffer(5);
buffer.writeInt(5);
buffer.writeByte(PONG_PACKET);
context.channel().writeAndFlush(buffer);
heartbeatCounter++;
System.out.println(identifier + " đã gửi gói PONG đến " + context.channel().remoteAddress() + ", tổng số: " + heartbeatCounter);
}
protected abstract void processMessage(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf);
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// Xử lý logic cho IdleStateEvent được tạo bởi IdleStateHandler.
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
switch (event.state()) {
case READER_IDLE:
handleReaderIdle(ctx);
break;
case WRITER_IDLE:
handleWriterIdle(ctx);
break;
case ALL_IDLE:
handleAllIdle(ctx);
break;
default:
break;
}
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.err.println("---" + ctx.channel().remoteAddress() + " đã kết nối---");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.err.println("---" + ctx.channel().remoteAddress() + " đã ngắt kết nối---");
}
protected void handleReaderIdle(ChannelHandlerContext ctx) {
System.err.println("---TRẠNG THÁI ĐỌC HẾT HẠN---");
}
protected void handleWriterIdle(ChannelHandlerContext ctx) {
System.err.println("---TRẠNG THÁI GHI HẾT HẠN---");
}
protected void handleAllIdle(ChannelHandlerContext ctx) {
System.err.println("---TRẠNG THÁI ĐỌC/VIẾT HẾT HẠN---");
}
}
Lớp CustomHeartbeatHandler chịu trách nhiệm gửi và nhận heartbeat. Chúng ta sẽ phân tích chi tiết vai trò của nó. Như đã đề cập ở trên, IdleStateHandler là chìa khóa để triển khai heartbeat, nó sẽ tạo ra các sự kiện IdleStateEvent khác nhau dựa trên các loại IO idle khác nhau, và việc bắt các sự kiện này thực chất được thực hiện trong phương thức userEventTriggered.
Hãy xem xét triển khai cụ thể của CustomHeartbeatHandler.userEventTriggered:
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
switch (event.state()) {
case READER_IDLE:
handleReaderIdle(ctx);
break;
case WRITER_IDLE:
handleWriterIdle(ctx);
break;
case ALL_IDLE:
handleAllIdle(ctx);
break;
default:
break;
}
}
}
Trong userEventTriggered, tùy thuộc vào state() của IdleStateEvent khác nhau mà có các xử lý khác nhau. Ví dụ, nếu là idle đọc dữ liệu, thì event.state() == READER_IDLE, do đó sẽ gọi handleReaderIdle để xử lý nó. CustomHeartbeatHandler cung cấp ba phương thức xử lý idle: handleReaderIdle, handleWriterIdle, handleAllIdle, ba phương thức này hiện chỉ có triển mặc định, chúng cần được ghi đè trong lớp con, hiện tại chúng ta tạm thời bỏ qua chúng, khi đến phần triển khai cụ thể của client và server sẽ xem xét chúng.
Sau khi hiểu điều này, chúng ta hãy xem xét phần xử lý dữ liệu:
@Override
protected void channelRead0(ChannelHandlerContext context, ByteBuf byteBuf) throws Exception {
if (byteBuf.getByte(4) == PING_PACKET) {
respondWithPong(context);
} else if (byteBuf.getByte(4) == PONG_PACKET){
System.out.println(identifier + " nhận được gói PONG từ " + context.channel().remoteAddress());
} else {
processMessage(context, byteBuf);
}
}
Trong CustomHeartbeatHandler.channelRead0, chúng ta trước tiên xác định loại gói tin dựa trên giao thức:
+--------+-----+---------------+
| Length |Type | Content |
| 17 | 1 |"XIN CHAO" |
+--------+-----+---------------+
Để xác định loại gói tin hiện tại, nếu là PING_PACKET thì đó là server nhận được tin nhắn PING từ client, lúc này server cần trả lời một tin nhắn PONG với loại là PONG_PACKET. Khi loại gói tin là PONG_PACKET, đó là client nhận được tin nhắn PONG do server gửi, lúc này chỉ cần in một log là được.
Phần client
Khởi tạo client
public class NetworkClient {
private NioEventLoopGroup workerGroup;
private Channel activeChannel;
private Bootstrap bootstrap;
private Random randomGenerator;
public static void main(String[] args) {
NetworkClient client = new NetworkClient();
client.initialize();
client.startCommunication();
}
public void initialize() {
workerGroup = new NioEventLoopGroup(4);
randomGenerator = new Random(System.currentTimeMillis());
bootstrap = new Bootstrap();
bootstrap
.group(workerGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new IdleStateHandler(0, 0, 5));
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
pipeline.addLast(new ClientHandler(NetworkClient.this));
}
});
}
public void startCommunication() {
try {
establishConnection();
sendRandomMessages();
} catch (Exception e) {
throw new RuntimeException("Lỗi khởi động client", e);
} finally {
workerGroup.shutdownGracefully();
}
}
private void establishConnection() {
if (activeChannel != null && activeChannel.isActive()) {
return;
}
ChannelFuture future = bootstrap.remoteAddress("127.0.0.1", 12345).connect();
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture futureListener) throws Exception {
if (futureListener.isSuccess()) {
activeChannel = futureListener.channel();
System.out.println("Kết nối đến server thành công!");
} else {
System.out.println("Kết nối đến server thất bại, thử lại sau 10s");
futureListener.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
establishConnection();
}
}, 10, TimeUnit.SECONDS);
}
}
});
}
private void sendRandomMessages() throws InterruptedException {
for (int i = 0; i < 10; i++) {
if (activeChannel != null && activeChannel.isActive()) {
String messageContent = "thông điệp từ client " + i;
ByteBuf buffer = activeChannel.alloc().buffer(5 + messageContent.getBytes().length);
buffer.writeInt(5 + messageContent.getBytes().length);
buffer.writeByte(CustomHeartbeatHandler.DATA_PACKET);
buffer.writeBytes(messageContent.getBytes());
activeChannel.writeAndFlush(buffer);
}
Thread.sleep(randomGenerator.nextInt(20000));
}
}
}
Mã trên là mã khởi tạo cho client Netty. Những bạn đã quen với Netty sẽ không quen thuộc với mã này. Chúng ta sẽ không nói thêm về các phần khác, chỉ xem xét phần ChannelInitializer.initChannel:
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new IdleStateHandler(0, 0, 5));
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
pipeline.addLast(new ClientHandler(NetworkClient.this));
}
});
Chúng ta thêm ba Handler vào pipeline: IdleStateHandler là cốt lõi của cơ chế heartbeat, chúng ta đặt thời gian timeout đọc/ghi cho client là 5s, tức là nếu client sau 5s không nhận được tin nhắn từ server hoặc không gửi tin nhắn đến server, sẽ tạo ra sự kiện ALL_IDLE. Tiếp theo chúng ta thêm LengthFieldBasedFrameDecoder, nó chịu trách vụ phân tích gói tin TCP của chúng ta, vì không liên quan đến mục đích của bài viết này, nên ở đây không mở rộng chi tiết. Handler cuối cùng là ClientHandler, nó kế thừa từ CustomHeartbeatHandler, là phần xử lý logic nghiệp vụ của chúng ta.
Handler của client
public class ClientHandler extends CustomHeartbeatHandler {
private NetworkClient clientReference;
public ClientHandler(NetworkClient client) {
super("client");
this.clientReference = client;
}
@Override
protected void processMessage(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
byte[] messageData = new byte[byteBuf.readableBytes() - 5];
byteBuf.skipBytes(5);
byteBuf.readBytes(messageData);
String messageContent = new String(messageData);
System.out.println(identifier + " nhận được nội dung: " + messageContent);
}
@Override
protected void handleAllIdle(ChannelHandlerContext ctx) {
super.handleAllIdle(ctx);
sendPingPacket(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
System.err.println("Kết nối bị ngắt, thử kết nối lại...");
clientReference.establishConnection();
}
}
ClientHandler kế thừa từ CustomHeartbeatHandler, nó ghi đè hai phương thức, một là processMessage, ở đây chỉ in ra tin nhắn nhận được. Phương thức ghi đè thứ hai là handleAllIdle. Như đã đề cập ở trên, client chịu trách nhiệm gửi tin nhắn heartbeat PING, khi client tạo ra sự kiện ALL_IDLE, sẽ dẫn đến việc phương thức userEventTriggered của lớp cha CustomHeartbeatHandler được gọi, và trong userEventTriggered, nó sẽ gọi các phương thức khác nhau dựa trên e.state(), do đó cuối cùng sẽ gọi ClientHandler.handleAllIdle, trong phương thức này, client gọi sendPingPacket để gửi một tin nhắn PING đến server.
Phần server
Khởi tạo server
public class NetworkServer {
private NioEventLoopGroup bossGroup;
private NioEventLoopGroup workerGroup;
public static void main(String[] args) {
NetworkServer server = new NetworkServer();
server.start();
}
public void start() {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup(4);
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new IdleStateHandler(10, 0, 0));
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
pipeline.addLast(new ServerHandler());
}
});
Channel serverChannel = bootstrap.bind(12345).sync().channel();
System.out.println("Server đã khởi động tại cổng 12345");
serverChannel.closeFuture().sync();
} catch (Exception e) {
throw new RuntimeException("Lỗi khởi động server", e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
Phần khởi tạo server cũng không có gì đáng nói, nó cũng giống như khởi tạo client, chúng ta thêm ba Handler vào pipeline.
Handler của server
public class ServerHandler extends CustomHeartbeatHandler {
public ServerHandler() {
super("server");
}
@Override
protected void processMessage(ChannelHandlerContext channelHandlerContext, ByteBuf buf) {
byte[] messageData = new byte[buf.readableBytes() - 5];
ByteBuf responseBuffer = Unpooled.copiedBuffer(buf);
buf.skipBytes(5);
buf.readBytes(messageData);
String messageContent = new String(messageData);
System.out.println(identifier + " nhận được nội dung: " + messageContent);
channelHandlerContext.write(responseBuffer);
}
@Override
protected void handleReaderIdle(ChannelHandlerContext ctx) {
super.handleReaderIdle(ctx);
System.err.println("---client " + ctx.channel().remoteAddress().toString() + " đọc hết hạn, đóng kết nối---");
ctx.close();
}
}
ServerHandler kế thừa từ CustomHeartbeatHandler, nó ghi đè hai phương thức, một là processMessage, ở đây triển khai chức năng EchoServer: tức là sau khi nhận được tin nhắn từ client, sẽ ngay lập tức trả lại nguyên vẹn tin nhắn đó cho client. Phương thức ghi đè thứ hai là handleReaderIdle, vì server chỉ quan tâm đến đọc idle của client, nên chỉ ghi đè phương thức này. Nếu server sau một thời gian nhất định không nhận được tin nhắn từ client, sẽ kích hoạt sự kiện READER_IDLE, từ đó sẽ gọi handleReaderIdle này. Như đã đề cập ở trên, client chịu trách nhiệm gửi tin nhắn heartbeat PING, và timeout đọc của server gấp đôi khoảng thời gian gửi PING của client, do đó khi sự kiện READER_IDLE của server được kích hoạt, có thể xác định là client đã mất kết nối, do đó server sẽ trực tiếp đóng kết nối client.
Tổng kết
- Chìa khóa để triển khai heartbeat trong Netty là sử dụng IdleStateHandler để tạo ra các sự kiện idle tương ứng.
- Thường thì client chịu trách nhiệm gửi tin nhắn heartbeat PING, do đó client cần chú ý đến sự kiện ALL_IDLE, khi sự kiện này được kích hoạt, client cần gửi tin nhắn PING đến server, thông báo cho server rằng "tôi vẫn còn sống".
- Server là nơi nhận tin nhắn PING của client, do đó server quan tâm đến sự kiện READER_IDLE, và khoảng thời gian READER_IDLE của server cần lớn hơn khoảng thời gian sự kiện ALL_IDLE của client (ví dụ client ALL_IDLE là 5s không đọc/viết thì kích hoạt, do đó READER_IDLE của server có thể đặt là 10s)
- Khi server nhận được tin nhắn PING của client, sẽ gửi một tin nhắn PONG làm phản hồi. Một cặp tin nhắn PING-PONG chính là một tương tác heartbeat.
Triển khai kết nối lại tự động khi client mất kết nối
public class AutoReconnectingClient {
private NioEventLoopGroup workerGroup;
private Channel currentChannel;
private Bootstrap bootstrap;
private boolean running = true;
private final String serverHost;
private final int serverPort;
private final long reconnectInterval;
public AutoReconnectingClient(String host, int port, long reconnectIntervalSeconds) {
this.serverHost = host;
this.serverPort = port;
this.reconnectInterval = reconnectIntervalSeconds;
initialize();
}
public static void main(String[] args) {
AutoReconnectingClient client = new AutoReconnectingClient("127.0.0.1", 12345, 10);
client.startCommunication();
}
private void initialize() {
workerGroup = new NioEventLoopGroup(4);
bootstrap = new Bootstrap();
bootstrap
.group(workerGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new IdleStateHandler(0, 0, 5));
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
pipeline.addLast(new ReconnectingClientHandler(AutoReconnectingClient.this));
}
});
}
public void startCommunication() {
try {
while (running) {
if (currentChannel == null || !currentChannel.isActive()) {
System.out.println("Đang cố gắng kết nối đến server...");
connectToServer();
}
Thread.sleep(5000);
}
} catch (Exception e) {
throw new RuntimeException("Lỗi trong quá trình giao tiếp", e);
} finally {
shutdown();
}
}
private void connectToServer() {
ChannelFuture future = bootstrap.remoteAddress(serverHost, serverPort).connect();
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture futureListener) throws Exception {
if (futureListener.isSuccess()) {
currentChannel = futureListener.channel();
System.out.println("Kết nối đến server thành công!");
} else {
System.out.println("Kết nối đến server thất bại, thử lại sau " + reconnectInterval + " giây");
futureListener.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
connectToServer();
}
}, reconnectInterval, TimeUnit.SECONDS);
}
}
});
}
public void sendMessage(String message) {
if (currentChannel != null && currentChannel.isActive()) {
ByteBuf buffer = currentChannel.alloc().buffer(5 + message.getBytes().length);
buffer.writeInt(5 + message.getBytes().length);
buffer.writeByte(CustomHeartbeatHandler.DATA_PACKET);
buffer.writeBytes(message.getBytes());
currentChannel.writeAndFlush(buffer);
}
}
public void shutdown() {
running = false;
if (currentChannel != null) {
currentChannel.close();
}
workerGroup.shutdownGracefully();
}
}
Mã ở trên, chúng ta trừu tượng hóa phương thức connectToServer, nó chịu trách nhiệm thiết lập kết nối TCP giữa client và server, và khi kết nối TCP thất bại, connectToServer sẽ sử dụng "channel().eventLoop().schedule" để trì hoãn 10 giây trước khi thử kết nối lại.
Handler của client với kết nối lại tự động
public class ReconnectingClientHandler extends CustomHeartbeatHandler {
private AutoReconnectingClient client;
public ReconnectingClientHandler(AutoReconnectingClient client) {
super("auto-reconnecting-client");
this.client = client;
}
@Override
protected void processMessage(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
byte[] messageData = new byte[byteBuf.readableBytes() - 5];
byteBuf.skipBytes(5);
byteBuf.readBytes(messageData);
String messageContent = new String(messageData);
System.out.println(identifier + " nhận được nội dung: " + messageContent);
}
@Override
protected void handleAllIdle(ChannelHandlerContext ctx) {
super.handleAllIdle(ctx);
sendPingPacket(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
System.err.println("Kết nối bị ngắt, chuẩn bị kết nối lại...");
client.connectToServer();
}
}
Một điểm quan trọng của kết nối lại tự động là phát hiện xem kết nối đã bị ngắt hay chưa. Do đó chúng ta đã sửa đổi ReconnectingClientHandler, ghi đè phương thức channelInactive. Khi kết nối TCP bị ngắt, phương thức channelInactive sẽ được gọi lại, do đó chúng ta gọi client.connectToServer() trong phương thức này để thực hiện kết nối lại.