Netty實(shí)戰(zhàn)入門教程之?什么是Netty
一、BIO、NIO、AIO
學(xué)習(xí)Netty需要了解BIO、NIO、AIO,具體可參考
Java網(wǎng)絡(luò)編程IO模型 — BIO、NIO、AIO詳解
二、什么是Netty?
官網(wǎng)介紹
Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.
Netty 是 一個(gè)異步事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用程序框架
,用于快速開發(fā)可維護(hù)的高性能協(xié)議服務(wù)器和客戶端。
Netty 是一個(gè) NIO 客戶端服務(wù)器框架,可以快速輕松地開發(fā)協(xié)議服務(wù)器和客戶端等網(wǎng)絡(luò)應(yīng)用程序。它極大地簡化和流線了網(wǎng)絡(luò)編程,例如 TCP 和 UDP 套接字服務(wù)器。
“快速和簡單”并不意味著生成的應(yīng)用程序會(huì)受到可維護(hù)性或性能問題的影響。Netty 是經(jīng)過精心設(shè)計(jì)的,它借鑒了許多協(xié)議(如 FTP、SMTP、HTTP 以及各種基于二進(jìn)制和基于文本的遺留協(xié)議)的實(shí)現(xiàn)經(jīng)驗(yàn)。因此,Netty 成功地找到了一種方法,可以在不妥協(xié)的情況下實(shí)現(xiàn)易于開發(fā)、性能、穩(wěn)定性和靈活性。
三、為什么學(xué)習(xí)Netty?
Netty在NIO的基礎(chǔ)上進(jìn)行了封裝,比NIO強(qiáng)大,Netty使用很廣泛,用的企業(yè)多,所以需要去學(xué)習(xí),Netty支持高并發(fā),在高并發(fā)的情況下具有良好的吞吐量,是網(wǎng)絡(luò)通訊的首選框架
四、原生NIO存在的問題
- NIO的類庫和API繁雜,使用麻煩,需要熟練掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等
- 需要具備其他額外技能,要熟悉Java多線程編程,因?yàn)镹IO編程涉及到Reactor模式,你必須對多線程和網(wǎng)絡(luò)編程非常熟悉,才能編寫出高質(zhì)量的NIO程序
- 開發(fā)工作量和難度都非常大,例如客戶端面臨斷連重連、網(wǎng)絡(luò)閃斷、半包讀寫,失敗緩存、網(wǎng)絡(luò)擁塞和異常流的處理等
- JDK NIO的Bug:例如臭名昭著的 Epoll Bug,它會(huì)導(dǎo)致Selector空輪詢,最終導(dǎo)致CPU100%,直到JDK1.7版本該問題仍舊存在,沒有被根本解決
五、Netty有什么好處
Netty對JDK自帶的NIO的API進(jìn)行了封裝,解決了上述問題
- 設(shè)計(jì)優(yōu)雅,適用于各種傳輸類型的統(tǒng)一API阻塞和非阻塞Socket,基于靈活且可擴(kuò)展的事件模型,可以清晰地分離關(guān)注點(diǎn),高度可定制的線程模型 - 單線程,一個(gè)或多個(gè)線程池
- 使用方便,詳細(xì)記錄的JavaDoc,用戶指南和示例,沒有其它依賴項(xiàng),JDK5(Netty3.x)或6(Netty4.x)就可以
- 安全,完整的SSL/TLS和StartTLS支持
- 社區(qū)活躍,不斷更新;版本迭代周期短,發(fā)現(xiàn)的Bug可以被及時(shí)修復(fù),同時(shí),更多的新功能會(huì)被加入
- 更高的吞吐量,更低的延遲
- 更少的資源消耗
- 最小化不必要的內(nèi)存拷貝
Netty模型圖

六、那些領(lǐng)域用到了Netty
Netty在互聯(lián)網(wǎng)領(lǐng)域、大數(shù)據(jù)分布式計(jì)算領(lǐng)域、游戲行業(yè)、通信行業(yè)等獲得了廣泛的應(yīng)用,一些業(yè)界著名的開源組件也基于Netty的NIO框架構(gòu)建 (文章尾有詳細(xì)介紹)。 2.Netty的特點(diǎn) 高并發(fā) Netty是一款基于NIO(Nonblocking IO,非阻塞 IO)開發(fā)的網(wǎng)絡(luò)通信框架,對比于BIO(Blocking IO,阻塞IO),他的并發(fā)性能得到了很大提高 。
七、Netty模型
??簡單版本
工作原理示意圖-簡單版
Netty主要基于主從Reactor多線程模型,做了一定的改進(jìn),其中主從Reactor多線程有多個(gè)Reactor

對上圖說明
- BoosGroup線程維護(hù)Selector,只關(guān)注Accept
- 當(dāng)接受到Accept事件,獲取到對應(yīng)的SocketChannel,封裝成NIOSocketChannel并注冊到Worker線程(事件循環(huán)),并進(jìn)行維護(hù)
- 當(dāng)Worker線程監(jiān)聽到selector 中通道發(fā)生自己感興趣的事件后,就進(jìn)行處理(就由Handler),注意handler已經(jīng)加入通道。
??進(jìn)階版本
Netty主要基于主從Reactor多線程模型,做了一定的改進(jìn),其中主從Reactor多線程模型有多個(gè)Reactor

??詳細(xì)版本

對上圖的說明
- Netty抽象出兩組線程池, BossGroup專門負(fù)責(zé)客戶端的連接,WorkerGroup專門負(fù)責(zé)網(wǎng)絡(luò)的讀寫
- BossGroup和WorkerGroup類型都是NIOEventLoopGroup
- NIOEventLoopGroup相當(dāng)于**一個(gè)事件循環(huán)組,**這個(gè)組中有多個(gè)事件循環(huán),每一個(gè)事件循環(huán)是NIOEventLoop
- NIOEventLoop表示一個(gè)不斷循環(huán)的執(zhí)行處理任務(wù)的線程,每個(gè)NIOEventLoop都有一個(gè)Selector,用于監(jiān)聽綁定在其上的Socket網(wǎng)絡(luò)通訊
- NIOEventLoopGroup 可以有多個(gè)線程,即可以含有多個(gè)NIOEventLoop
- 每個(gè)Boss NIOEventLoop 循環(huán)執(zhí)行的步驟有3步
- 輪詢accept事件
- 處理accept事件,與client建立連接,生成NIoSocketChannel,并將其注冊到某個(gè)Worker NIOEventLoop 上的selector
- 處理任務(wù)隊(duì)列的任務(wù),即runAllTasks
7.每個(gè)Worker NIOEventLoop 循環(huán)執(zhí)行的步驟
- 輪詢r(jià)ead,write事件
- 處理i/o事件,在對應(yīng)的NIOSocketChannel處理
- 處理任務(wù)隊(duì)列的任務(wù),即runAllTasks
8.每個(gè)Worker NIoEventLoop 處理業(yè)務(wù)時(shí),會(huì)使用pipeline(管道)pipeline中包含了channel,即通過了pipelien可以獲取到對應(yīng)通道,管道中維護(hù)了很多的處理器
八、Netty入門案例 — TCP服務(wù)
? 需求說明
Netty服務(wù)器在6666端口監(jiān)聽,客戶端發(fā)送消息給服務(wù)器 “Hello,服務(wù)器”
服務(wù)器可以回復(fù)消息給客戶端 “hello 客戶端”
? 效果圖

? 核心源碼
NettyServer
服務(wù)器,監(jiān)聽6666端口
package com.wanshi.netty.simple;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer {
public static void main(String[] args) throws Exception {
// 創(chuàng)建BossGroup 和 WorkerGroup
//說明
//1.創(chuàng)建2個(gè)線程組,分別是boosGroup和workerGroup
//2.boosGroup只是處理連接請求,真正的與客戶端業(yè)務(wù)處理,會(huì)交給workerGroup完成
//3.兩個(gè)都是無限循環(huán)
//4. boosGroup 和 workerGroup 含有的子線程(NioEventLoop)的個(gè)數(shù)
// 默認(rèn)實(shí)際 CPU核數(shù)*2
EventLoopGroup boosGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//創(chuàng)建服務(wù)器端的啟動(dòng)的對象,配置參數(shù)
ServerBootstrap bootstrap = new ServerBootstrap();
//使用鏈?zhǔn)骄幊虂磉M(jìn)行設(shè)置
bootstrap.group(boosGroup, workerGroup) // 設(shè)置兩個(gè)線程組
.channel(NioServerSocketChannel.class) //使用NioServerSocketChannel作為服務(wù)器的通道實(shí)現(xiàn)
.option(ChannelOption.SO_BACKLOG, 128) // 設(shè)置線程隊(duì)列等待連接個(gè)數(shù)
.childOption(ChannelOption.SO_KEEPALIVE, true) // 設(shè)置保持活動(dòng)連接狀態(tài)
.childHandler(new ChannelInitializer<SocketChannel>() { // 創(chuàng)建一個(gè)通道初始化對象(匿名對象)
//給pipeline 設(shè)置處理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//可以使用一個(gè)集合管理SocketChannel,再推送消息時(shí),可以將業(yè)務(wù)加入到各個(gè)channel對應(yīng)的NioEventLoop的taskQueue
//或者 scheduleTaskQueue
System.out.println("客戶 SocketChannel:" + socketChannel.hashCode());
socketChannel.pipeline().addLast(new NettyServerHandler());
}
}); //給我們的workerGroup的某一個(gè)EventLoop的對應(yīng)的管道設(shè)置處理器
System.out.println("服務(wù)器 is ready...");
//綁定一個(gè)端口并且同步,生成了一個(gè)ChannelFuture對象
//啟動(dòng)服務(wù)器并綁定端口
ChannelFuture channelFuture = bootstrap.bind(6668).sync();
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("監(jiān)聽端口 6668 成功");
} else {
System.out.println("監(jiān)聽端口 6668 失敗");
}
}
});
//對關(guān)閉通道進(jìn)行監(jiān)聽
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
//優(yōu)雅關(guān)閉
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}NettyServerHandler
服務(wù)器處理器,處理客戶端發(fā)送的消息并輸出到控制臺(tái),并向服務(wù)端發(fā)送消息
package com.wanshi.netty.simple;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import java.util.concurrent.TimeUnit;
/**
* 自定義一個(gè)Handler,需要繼承netty規(guī)定好的某個(gè)HandlerAdapter
* 這時(shí)我們自定義的handler才能稱為一個(gè)handler
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
//讀取數(shù)據(jù)事件(這里我們可以讀取客戶端發(fā)送的消息)
/**
* 1.ChannelHandlerContext ctx: 上下文對象,含有 管道pipeline,通道channel,地址
* 2.Object msg:就是客戶端發(fā)送的數(shù)據(jù),默認(rèn)Object
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server ctx =" + ctx);
//將 msg 轉(zhuǎn)成一個(gè)ByteBuf
// ByteBuf buf = (ByteBuf) msg;
// System.out.println("客戶端發(fā)送消息是:" + buf.toString(CharsetUtil.UTF_8));
// System.out.println("客戶端地址:" + ctx.channel().remoteAddress());
//自定義普通任務(wù)隊(duì)列,將耗時(shí)長的任務(wù)加入隊(duì)列,定義到NioEventLoop --> taskQueue
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
Thread.currentThread().sleep(10 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客戶端:喵2~", CharsetUtil.UTF_8));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread.currentThread().sleep(20 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客戶端:喵3~", CharsetUtil.UTF_8));
//用戶自定義定時(shí)任務(wù) --》 該任務(wù)是提交到 scheduleQueue中
ctx.channel().eventLoop().schedule(new Runnable() {
Thread.currentThread().sleep(5 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客戶端:喵4~", CharsetUtil.UTF_8));
}, 5, TimeUnit.SECONDS);
System.out.println("go ~");
}
* 數(shù)據(jù)讀取完畢
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//writeAndFlush 是 write+flush
//將數(shù)據(jù)寫入到緩存,并刷新
//一般講,需要對發(fā)送的數(shù)據(jù)進(jìn)行編碼
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客戶端:喵1~", CharsetUtil.UTF_8));
//處理異常,一般是需要關(guān)閉通道
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}NettyClient
客戶端,用于連接服務(wù)器
package com.wanshi.netty.simple;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient {
public static void main(String[] args) throws Exception {
//客戶端需要一個(gè)事件循環(huán)組
EventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
//創(chuàng)建一個(gè)客戶端啟動(dòng)對象
//客戶端使用的不是ServerGroup 而是Bootstrap
Bootstrap bootstrap = new Bootstrap();
//設(shè)置相關(guān)參數(shù)
bootstrap.group(eventExecutors) //設(shè)置線程組
.channel(NioSocketChannel.class) //設(shè)置客戶端通道的實(shí)現(xiàn)類(反射)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyClientHandler()); //加入自己的處理器
}
});
System.out.println("客戶端 is ok...");
//啟動(dòng)客戶端去連接服務(wù)器端, netty異步模型ChannelFuture
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
//給關(guān)閉通道進(jìn)行監(jiān)聽
channelFuture.channel().closeFuture().sync();
} finally {
//優(yōu)雅關(guān)閉線程池
eventExecutors.shutdownGracefully();
}
}
}NettyClientHandler
客戶端處理器,處理服務(wù)器發(fā)送的消息輸出到控制臺(tái),并向服務(wù)器發(fā)送消息
package com.wanshi.netty.simple;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 當(dāng)通道就緒就會(huì)觸發(fā)該方法
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client " + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,服務(wù)端Server:喵~", CharsetUtil.UTF_8));
}
* 當(dāng)通道有讀取事件時(shí),會(huì)觸發(fā)
* @param msg
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//將msg轉(zhuǎn)成buf
ByteBuf buf = (ByteBuf) msg;
System.out.println("服務(wù)器回復(fù)的消息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服務(wù)器的地址:" + ctx.channel().remoteAddress());
// 當(dāng)通道發(fā)生異常時(shí)執(zhí)行此方法
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}?小結(jié)
以上就是【Bug 終結(jié)者】對Netty入門簡單的理解,小編認(rèn)為Java中支持三種網(wǎng)絡(luò)編程IO模型,BIO、NIO、AIO,Netty對NIO又做了一層封裝,本文我們已大致了解Netty到底是什么,Netty入門案例還需多敲,多練,方可掌握,通過本文能加固你對Netty的理解
到此這篇關(guān)于Netty實(shí)戰(zhàn)入門教程之 什么是Netty的文章就介紹到這了,更多相關(guān)Netty入門內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
淺談HBase在SpringBoot項(xiàng)目里的應(yīng)用(含HBaseUtil工具類)
這篇文章主要介紹了淺談HBase在SpringBoot項(xiàng)目里的應(yīng)用(含HBaseUtil工具類),具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-10-10
SpringCloud Ribbon負(fù)載均衡實(shí)例解析
這篇文章主要介紹了SpringCloud Ribbon負(fù)載均衡實(shí)例解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-11-11
java開發(fā)Activiti進(jìn)階篇流程實(shí)例詳解
這篇文章主要為大家介紹了java開發(fā)Activiti進(jìn)階篇流程實(shí)例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-08-08
SpringBoot+JUnit5+MockMvc+Mockito單元測試的實(shí)現(xiàn)
今天聊聊如何在 SpringBoot 中集成 Junit5、MockMvc、Mocktio。Junit5 是在 Java 棧中應(yīng)用最廣的測試框架,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-09-09
Spring boot 總結(jié)之跨域處理cors的方法
本篇文章主要介紹了Spring boot 總結(jié)之跨域處理cors的方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-02-02

