Netty的心跳檢測(cè)解析
一、網(wǎng)絡(luò)連接假死現(xiàn)象
客戶端的心跳檢測(cè)對(duì)于任何長(zhǎng)連接的應(yīng)用來(lái)說(shuō),都是一個(gè)非常基礎(chǔ)的功能。
要理解心跳的重要性,首先需要從網(wǎng)絡(luò)連接假死的現(xiàn)象說(shuō)起。
什么是連接假死呢?如果底層的TCP連接已經(jīng)斷開(kāi),但是服務(wù)器端并沒(méi)有正常地關(guān)閉套接字,認(rèn)為這條連接仍然是存在的。
連接假死的具體表現(xiàn)如下:
- 在服務(wù)器端,會(huì)有一些處于TCP_ESTABLISHED狀態(tài)的正常連接
- 在客戶端,TCP客戶端已經(jīng)顯示連接已經(jīng)斷開(kāi)
- 客戶端此時(shí)雖然可以進(jìn)行斷線重連操作,但是上一次連接狀態(tài)依然被服務(wù)器端認(rèn)為有效,并且服務(wù)器端的資源得不到正確釋放,包括套接字上下文以及接受/發(fā)送緩沖區(qū)
連接假死的情況雖然不常見(jiàn),但是確實(shí)存在。服務(wù)器端長(zhǎng)時(shí)間運(yùn)行后,會(huì)面臨大量假死連接得不到釋放的情況。由于每個(gè)連接都會(huì)消耗CPU和內(nèi)存資源,因此大量假死的連接會(huì)逐漸耗光服務(wù)器的資源,使得服務(wù)器越來(lái)越慢,IO處理效率越來(lái)越低,最終導(dǎo)致服務(wù)器崩潰。
連接假死通常是由多個(gè)原因造成的:
- 應(yīng)用程序出現(xiàn)線程堵塞,無(wú)法進(jìn)行連接的讀寫
- 網(wǎng)絡(luò)相關(guān)的設(shè)別出現(xiàn)故障
- 網(wǎng)絡(luò)丟包
解決假死的有效手段是客戶端定時(shí)進(jìn)行心跳檢測(cè),服務(wù)端定時(shí)進(jìn)行空閑檢測(cè)。
二、服務(wù)器端的空閑檢測(cè)
想解決假死問(wèn)題,服務(wù)器端的有效手段是空閑檢測(cè)。所謂空閑檢測(cè)就是每隔一段時(shí)間監(jiān)測(cè)子通道是否有數(shù)據(jù)讀寫,如果有則子通道是正常的,如果沒(méi)有則判定為假死,關(guān)閉子通道。
服務(wù)器端實(shí)現(xiàn)空閑檢測(cè)只需要使用Netty自帶的IdleStateHandler空閑狀態(tài)處理器就可以實(shí)現(xiàn)這個(gè)功能。
@Slf4j
public class HeartBeatServerHandler extends IdleStateHandler {
private static final int READ_IDLE_GAP = 150; // 最大空閑時(shí)間(s)
public HeartBeatServerHandler() {
super(READ_IDLE_GAP, 0, 0, TimeUnit.SECONDS);
}
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
log.info("{}秒內(nèi)未讀到數(shù)據(jù),關(guān)閉連接", READ_IDLE_GAP);
// 其他處理,如關(guān)閉會(huì)話
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 判斷消息實(shí)例
if (!(msg instanceof MessageProtos.Message message)) {
super.channelRead(ctx, msg);
return;
}
if (message.getType() == MessageProtos.HeadType.HEART_BEAT) {
if (ctx.channel().isActive()) {
// 將心跳數(shù)據(jù)包直接回給客戶端
ctx.writeAndFlush(msg);
}
}
super.channelRead(ctx, msg);
}
}在HeartBeatServerHandler的構(gòu)造函數(shù)中,調(diào)用了基類IdleStateHandler的構(gòu)造函數(shù),傳遞了四個(gè)參數(shù):
- 入站空閑檢測(cè)時(shí)長(zhǎng):指的是一段時(shí)間內(nèi)如果沒(méi)有消息入站就判定為連接假死
- 出站空閑檢測(cè)時(shí)長(zhǎng):指的是一段時(shí)間內(nèi)如果沒(méi)有數(shù)據(jù)出站就判定為連接假死
- 出/入站檢測(cè)時(shí)長(zhǎng):表示在一段時(shí)間內(nèi)如果沒(méi)有出站或者入站就判定為連接假死
- 時(shí)間單位
判定為假死之后IdleStateHandler會(huì)回調(diào)自己的channelIdle()方法,一般在這個(gè)方法中去進(jìn)行一些連接的關(guān)閉。
HeartBeatServerHandler實(shí)現(xiàn)的主要功能是空閑檢測(cè),需要客戶端定時(shí)發(fā)送心跳數(shù)據(jù)包(或報(bào)文、消息)進(jìn)行配合,而且客戶端發(fā)送心跳數(shù)據(jù)包的時(shí)間間隔需要遠(yuǎn)遠(yuǎn)小于服務(wù)器端的空閑檢測(cè)時(shí)間間隔。
收到客戶端的心跳數(shù)據(jù)包之后可以直接回復(fù)客戶端,讓客戶端也能進(jìn)行類似的空閑檢測(cè)。由于IdleStateHandler本身是一個(gè)入站處理器,只需要重寫這個(gè)子類的channelRead方法,然后將心跳數(shù)據(jù)包直接寫回給客戶端即可。
如果HeartBeatServerHandler要重寫channelRead方法,一定要調(diào)用積累的channelRead方法,不然IdleStateHandler的入站空閑檢測(cè)會(huì)無(wú)效。
三、客戶端的心跳報(bào)文
與服務(wù)器端的空閑檢測(cè)相配合,客戶端需要定期發(fā)送數(shù)據(jù)包到服務(wù)器端,通常這個(gè)數(shù)據(jù)包稱為心跳數(shù)據(jù)包。
@Slf4j
public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {
// 心跳的時(shí)間間隔,單位為秒
private static final int HEART_BEAT_INTERVAL = 50;
// 在Handler業(yè)務(wù)處理器被加入到流水線時(shí)開(kāi)始發(fā)送心跳數(shù)據(jù)包
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
ClientSession session = ctx.channel().attr(ClientSession.CLIENT_SESSION).get();
MessageProtos.MessageHeartBeat heartBeat =
MessageProtos.MessageHeartBeat.newBuilder()
.setSeq(0)
.setJson("{\"from\":\"client\"}")
.setUid(session.getUserDTO().getUserId())
.build();
MessageProtos.Message message = MessageProtos.Message.newBuilder()
.setType(MessageProtos.HeadType.HEART_BEAT)
.setSessionId(session.getSessionId())
.setMessageHeartBeat(heartBeat)
.build();
heartBeat(ctx, message);
super.handlerAdded(ctx);
}
private void heartBeat(ChannelHandlerContext ctx, MessageProtos.Message message) {
// 提交在給定延遲后啟用的一次性任務(wù)。
ctx.executor().schedule(() -> {
if (ctx.channel().isActive()) {
log.info("發(fā)送心跳消息給服務(wù)端");
ctx.writeAndFlush(message);
// 遞歸調(diào)用,發(fā)送下一次的心跳
heartBeat(ctx, message);
}
}, HEART_BEAT_INTERVAL, TimeUnit.SECONDS);
}
// 接收到服務(wù)器的心跳回寫
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof MessageProtos.Message message)) {
super.channelRead(ctx, msg);
return;
}
if (message.getType() == MessageProtos.HeadType.HEART_BEAT) {
log.info("收到會(huì)寫的心跳信息");
} else {
super.channelRead(ctx, msg);
}
}
}在HeartBeatClientHandler實(shí)例被加入到流水線時(shí),它重寫的handlerAdded方法被回調(diào)。在handlerAdded方法中開(kāi)始調(diào)用heartBeat方法發(fā)送心跳數(shù)據(jù)包。heartBeat是一個(gè)不斷遞歸調(diào)用的方法,它使用了ctx.executor()獲取當(dāng)前通道綁定的Reactor反應(yīng)器NIO線,然后通過(guò)NIO現(xiàn)線程的schedule定時(shí)調(diào)度方法,在50s后觸發(fā)這個(gè)方法的執(zhí)行,再之后遞歸調(diào)用同樣延時(shí)50s后繼續(xù)發(fā)送。
客戶端的心跳間隔要比服務(wù)器端的空閑檢測(cè)時(shí)間間隔要短,一般來(lái)說(shuō)要比它的一半要短一些,可以直接定義為空閑檢測(cè)時(shí)間間隔的1/3,以防止公網(wǎng)偶發(fā)的秒級(jí)抖動(dòng)。
HeartBeatClientHandler實(shí)例并不是一開(kāi)始就裝配到流水線中的,它裝配的實(shí)際實(shí)在登錄成功之后。
HeartBeatClientHandler實(shí)際上也可以集成IdleStateHandler類在客戶端進(jìn)行空閑檢測(cè),這樣客戶端也可以對(duì)服務(wù)器進(jìn)行假死判定,在服務(wù)器假死的情況下,客戶端可以發(fā)起重連。
相關(guān)文章
java實(shí)現(xiàn)分布式項(xiàng)目搭建的方法
如果淘寶的七天自動(dòng)確認(rèn)收貨讓你設(shè)計(jì)你用Java怎么實(shí)現(xiàn)
SpringCloud turbine監(jiān)控實(shí)現(xiàn)過(guò)程解析
java實(shí)現(xiàn)表格數(shù)據(jù)的存儲(chǔ)

