深入理解Netty核心類及其作用
MessageToByteEncoder
MessageToByteEncoder是一個抽象編碼器,子類可重寫encode方法把對象編碼為ByteBuf輸出。
MessageToByteEncoder繼承自ChannelOutboundHandlerAdapter,encode在出站是被調(diào)用。
public class MyMessageEncoder extends MessageToByteEncoder<MessagePO> {
@Override
protected void encode(ChannelHandlerContext ctx, MessagePO msg, ByteBuf out) throws Exception {
System.out.println("MyMessageEncoder.encode,被調(diào)用");
String json = JSONObject.toJSONString(msg);
out.writeInt(json.getBytes(StandardCharsets.UTF_8).length);
out.writeBytes(json.getBytes(StandardCharsets.UTF_8));
}
}
ByteToMessageDecoder
ByteToMessageDecoder是一種ChannelInboundHandler,可以稱為解碼器,負責(zé)將byte字節(jié)流(ByteBuf)轉(zhuǎn)換成一種Message,Message是應(yīng)用可以自己定義的一種Java對象。
ByteToMessageDecoder:用于將字節(jié)轉(zhuǎn)為消息,需要檢測緩沖區(qū)是否有足夠的字節(jié)。
public class MyMessageDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyMessageDecoder.decode,被調(diào)用");
while (in.readableBytes() >= 4){
int num = in.readInt();
System.out.println("解碼出一個整數(shù):"+num);
out.add(num);
}
}
}
ReplayingDecoder
ReplayingDecoder:繼承自ByteToMessageDecoder,不需要檢測緩沖區(qū)是否有足夠的字節(jié),但是ReplayingDecoder的速度略慢于ByteToMessageDecoder,而且并不是所有的ByteBuf都支持。
項目復(fù)雜度高用ReplayingDecoder,否則使用ByteToMessageDecoder。
public class MyMessageDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyMessageDecoder.decode,被調(diào)用");
int length = in.readInt();
byte[] content = new byte[length];
in.readBytes(content);
String json = new String(content,StandardCharsets.UTF_8);
MessagePO po = JSONObject.parseObject(json,MessagePO.class);
out.add(po);
}
}
MessageToMessageEncoder
用于從一種消息編碼為另外一種消息,例如從POJO到POJO,是一種ChannelOutboundHandler
MessageToMessageDecoder
從一種消息解碼為另一種消息,例如POJO到POJO,是一種ChannelInboundHandler
MessageToMessageCodec
整合了MessageToMessageEncoder 和 MessageToMessageDecoder
public class RequestMessageCodec extends MessageToMessageCodec<String, RequestData> {
@Override
protected void encode(ChannelHandlerContext ctx, RequestData msg, List<Object> out) throws Exception {
System.out.println("RequestMessageCodec.encode 被調(diào)用 " + msg);
String json = JSONObject.toJSONString(msg);
out.add(json);
}
@Override
protected void decode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception {
System.out.println("RequestMessageCodec.decode 被調(diào)用 " + msg);
RequestData po = JSONObject.parseObject(msg, RequestData.class);
out.add(po);
}
}
ChannelInitializer
ChannelInitializer是一種特殊的ChannelInboundHandler,可以通過一種簡單的方式(調(diào)用initChannel方法)來初始化Channel。
通常在Bootstrap.handler(ChannelHandler) , ServerBootstrap.handler(ChannelHandler) 和 ServerBootstrap.childHandler(ChannelHandler)中給Channel設(shè)置ChannelPipeline。
注意:當initChannel被執(zhí)行完后,會將當前的handler從Pipeline中移除。
Bootstrap bootstrap = new Bootstrap().group(group)//設(shè)置線程組
.channel(NioSocketChannel.class)//設(shè)置客戶端通道的實現(xiàn)類
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyClientHandler());//加入自己的處理器
}
});
ServerBootstrap bootstrap = new ServerBootstrap().group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作為服務(wù)器的通道實現(xiàn)
.option(ChannelOption.SO_BACKLOG, 128)//設(shè)置線程隊列等待連接的個數(shù)
.childOption(ChannelOption.SO_KEEPALIVE, true)//設(shè)置保持活動連接狀態(tài)
// .handler(null)//該Handler對應(yīng)bossGroup
.childHandler(new ChannelInitializer<SocketChannel>() {//給workerGroup的EventLoop對應(yīng)的管道設(shè)置處理器
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyServerHandler());
}
});
SimpleChannelInboundHandler
SimpleChannelInboundHandler繼承自ChannelInboundHandlerAdapter,可以通過泛型來規(guī)定消息類型。
處理入站的數(shù)據(jù)我們只需要實現(xiàn)channelRead0方法。
SimpleChannelInboundHandler在接收到數(shù)據(jù)后會自動release掉數(shù)據(jù)占用的Bytebuffer資源,ChannelInboundHandlerAdapter不會自動釋放。
public class MyClientHandler extends SimpleChannelInboundHandler<MessagePO> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessagePO msg) throws Exception {
System.out.println("收到服務(wù)端消息:" + msg);
}
}
DefaultEventLoopGroup
在向pipline中添加ChannelHandler時,可以提供一個新的線程組,Handler業(yè)務(wù)會在該線程中執(zhí)行。
當加ChannelHandler需要執(zhí)行多線程并發(fā)業(yè)務(wù)時,DefaultEventLoopGroup可以派上大用處。
如果沒有設(shè)置DefaultEventLoopGroup,默認使用的是EventLoopGroup workerGroup = new NioEventLoopGroup();
DefaultEventLoopGroup businessGroup = new DefaultEventLoopGroup(100); ... addLast(businessGroup, new MyNettyServerHandler())
/**
* 讀取客戶端發(fā)送過來的消息
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("收到客戶信息:" + byteBuf.toString(CharsetUtil.UTF_8));
System.out.println("客戶端地址:" + ctx.channel().remoteAddress());
System.out.println("處理線程:" + Thread.currentThread().getName());
ctx.executor().parent().execute(()->{
try {
System.out.println("parent().execute Thread = " + Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(2L);
System.out.println("parent任務(wù)執(zhí)行完成1");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
ctx.executor().parent().execute(()->{
try {
System.out.println("parent().execute Thread = " + Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(2L);
System.out.println("parent任務(wù)執(zhí)行完成2");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
ctx.executor().parent().execute(()->{
try {
System.out.println("parent().execute Thread = " + Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(2L);
System.out.println("parent任務(wù)執(zhí)行完成3");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}以上代碼執(zhí)行日志如下:
收到客戶信息:Hello 服務(wù)端
客戶端地址:/127.0.0.1:60345
處理線程:defaultEventLoopGroup-4-1
parent().execute Thread = defaultEventLoopGroup-4-2
parent().execute Thread = defaultEventLoopGroup-4-3
程序繼續(xù)~~ defaultEventLoopGroup-4-1
parent().execute Thread = defaultEventLoopGroup-4-4
parent任務(wù)執(zhí)行完成1
parent任務(wù)執(zhí)行完成3
parent任務(wù)執(zhí)行完成2
EventLoop定時任務(wù)
可以在Handler中通過方法ctx.channel().eventLoop().schedule()添加定時任務(wù)
ctx.channel().eventLoop().schedule(()->{
try {
System.out.println("Thread.currentThread().getName() = " + Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(2L);
System.out.println("定時任務(wù)執(zhí)行完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
},10L,TimeUnit.SECONDS);
到此這篇關(guān)于深入理解Netty核心類及其作用的文章就介紹到這了,更多相關(guān)Netty核心類內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
使用Get方式提交數(shù)據(jù)到Tomcat服務(wù)器的方法
這篇文章將介紹向服務(wù)器發(fā)送數(shù)據(jù),并且服務(wù)器將數(shù)據(jù)的處理結(jié)果返回給客戶端,本文給大家介紹使用Get方式向服務(wù)器發(fā)送數(shù)據(jù),感興趣的朋友一起學(xué)習(xí)吧2016-04-04
spring boot如何基于JWT實現(xiàn)單點登錄詳解
這篇文章主要介紹了spring boot如何基于JWT實現(xiàn)單點登錄詳解,用戶只需登錄一次就能夠在這兩個系統(tǒng)中進行操作。很明顯這就是單點登錄(Single Sign-On)達到的效果,需要的朋友可以參考下2019-06-06
關(guān)于Java中的try-with-resources語句
這篇文章主要介紹了關(guān)于Java中的try-with-resources語句,try-with-resources是Java中的環(huán)繞語句之一,旨在減輕開發(fā)人員釋放try塊中使用的資源的義務(wù),需要的朋友可以參考下2023-05-05
Java中的synchronized?優(yōu)化方法之鎖膨脹機制
這篇文章主要介紹了Java中的synchronized?優(yōu)化方法之鎖膨脹機制,鎖膨脹機制是提升?synchronized?性能最有利的方法之一,下面我們就來看看什么事鎖膨脹及鎖膨脹的各種細節(jié)2022-05-05
SpringDataJpa創(chuàng)建聯(lián)合索引的實現(xiàn)
這篇文章主要介紹了SpringDataJpa創(chuàng)建聯(lián)合索引的實現(xiàn),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12
Java怎樣創(chuàng)建集合才能避免造成內(nèi)存泄漏你了解嗎
內(nèi)存泄漏是指無用對象持續(xù)占有內(nèi)存或無用對象的內(nèi)存得不到及時釋放,從而造成內(nèi)存空間的浪費稱為內(nèi)存泄漏。長生命周期的對象持有短生命周期對象的引用就很可能發(fā)生內(nèi)存泄漏,盡管短生命周期對象已經(jīng)不再需要,但是因為長生命周期持有它的引用而導(dǎo)致不能被回收2021-09-09

