Netty學(xué)習(xí)教程之基礎(chǔ)使用篇
什么Netty?
Netty是由JBOSS提供的一個(gè)java開(kāi)源框架。Netty提供異步的、事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用程序框架和工具,用以快速開(kāi)發(fā)高性能、高可靠性的網(wǎng)絡(luò)服務(wù)器和客戶(hù)端程序。
也就是說(shuō),Netty 是一個(gè)基于NIO的客戶(hù)、服務(wù)器端編程框架,使用Netty 可以確保你快速和簡(jiǎn)單的開(kāi)發(fā)出一個(gè)網(wǎng)絡(luò)應(yīng)用,例如實(shí)現(xiàn)了某種協(xié)議的客戶(hù),服務(wù)端應(yīng)用。Netty相當(dāng)簡(jiǎn)化和流線(xiàn)化了網(wǎng)絡(luò)應(yīng)用的編程開(kāi)發(fā)過(guò)程,例如,TCP和UDP的socket服務(wù)開(kāi)發(fā)。
我們下面編寫(xiě)四個(gè)類(lèi)
1.用于接收數(shù)據(jù)的服務(wù)器端Socket
2.用于接收客戶(hù)端的消息,用于接收和反饋客戶(hù)端發(fā)出的消息類(lèi)ServertHandler
3.用于發(fā)送數(shù)據(jù)的服務(wù)器端Client
4.用于發(fā)送數(shù)據(jù)和接收服務(wù)器端發(fā)出的數(shù)據(jù)處理類(lèi)ClientHandler
Socket.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
public class Server {
public static void main(String[] args) throws InterruptedException {
//1.第一個(gè)線(xiàn)程組是用于接收Client端連接的
EventLoopGroup bossGroup = new NioEventLoopGroup();
//2.第二個(gè)線(xiàn)程組是用于實(shí)際的業(yè)務(wù)處理的
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);//綁定兩個(gè)線(xiàn)程池
b.channel(NioServerSocketChannel.class);//指定NIO的模式,如果是客戶(hù)端就是NioSocketChannel
b.option(ChannelOption.SO_BACKLOG, 1024);//TCP的緩沖區(qū)設(shè)置
b.option(ChannelOption.SO_SNDBUF, 32*1024);//設(shè)置發(fā)送緩沖的大小
b.option(ChannelOption.SO_RCVBUF, 32*1024);//設(shè)置接收緩沖區(qū)大小
b.option(ChannelOption.SO_KEEPALIVE, true);//保持連續(xù)
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());//拆包粘包定義結(jié)束字符串(第一種解決方案)
sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,buf));//在管道中加入結(jié)束字符串
// sc.pipeline().addLast(new FixedLengthFrameDecoder(200));第二種定長(zhǎng)
sc.pipeline().addLast(new StringDecoder());//定義接收類(lèi)型為字符串把ByteBuf轉(zhuǎn)成String
sc.pipeline().addLast(new ServertHandler());//在這里配置具體數(shù)據(jù)接收方法的處理
}
});
ChannelFuture future = b.bind(8765).sync();//綁定端口
future.channel().closeFuture().sync();//等待關(guān)閉(程序阻塞在這里等待客戶(hù)端請(qǐng)求)
bossGroup.shutdownGracefully();//關(guān)閉線(xiàn)程
workerGroup.shutdownGracefully();//關(guān)閉線(xiàn)程
}
}
1.在上面這個(gè)Server.java中,我們都要定義兩個(gè)線(xiàn)程池,boss和worker,boss是用于管理連接到server端的client的連接數(shù)的線(xiàn)程池,而woeker是用于管理實(shí)際操作的線(xiàn)程池。
2.ServerBootstrap用一個(gè)ServerSocketChannelFactory 來(lái)實(shí)例化。ServerSocketChannelFactory 有兩種選擇,一種是NioServerSocketChannelFactory,一種是OioServerSocketChannelFactory。 前者使用NIO,后則使用普通的阻塞式IO。它們都需要兩個(gè)線(xiàn)程池實(shí)例作為參數(shù)來(lái)初始化,一個(gè)是boss線(xiàn)程池,一個(gè)是worker線(xiàn)程池。
3.然后使ServerBookstrap管理boss和worker線(xiàn)程池。并且設(shè)置各個(gè)緩沖區(qū)的大小。
4.這里的事件處理類(lèi)經(jīng)常會(huì)被用來(lái)處理一個(gè)最近的已經(jīng)接收的Channel。ChannelInitializer是一個(gè)特殊的處理類(lèi),他的目的是幫助使用者配置一個(gè)新的Channel。也許你想通過(guò)增加一些處理類(lèi)比如NettyServerHandler來(lái)配置一個(gè)新的Channel 或者其對(duì)應(yīng)的ChannelPipeline來(lái)實(shí)現(xiàn)你的網(wǎng)絡(luò)程序。 當(dāng)你的程序變的復(fù)雜時(shí),可能你會(huì)增加更多的處理類(lèi)到pipline上,然后提取這些匿名類(lèi)到最頂層的類(lèi)上。
5.在使用原始的encoder、decoder的情況下,Netty發(fā)送接收數(shù)據(jù)都是按照ByteBuf的形式,其它形式都是不合法的。 而在上面這個(gè)Socket中,我使用sc.pipeline().addLast()這個(gè)方法設(shè)置了接收為字符串類(lèi)型,注意:只能設(shè)置接收為字符串類(lèi)型,發(fā)送還是需要發(fā)送ByteBuf類(lèi)型的數(shù)據(jù)。而且在這里我還設(shè)置了以$_為結(jié)尾的字符串就代表了本次請(qǐng)求字符串的結(jié)束。
6.通過(guò)b.bind綁定端口,用于監(jiān)聽(tīng)的端口號(hào)。
ServerHandler.java
public class ServertHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("server"+body);//前面已經(jīng)定義了接收為字符串,這里直接接收字符串就可以
//服務(wù)端給客戶(hù)端的響應(yīng)
String response= " hi client!$_";//發(fā)送的數(shù)據(jù)以定義結(jié)束的字符串結(jié)尾
ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));//發(fā)送必須還是ByteBuf類(lèi)型
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
ServertHandler繼承自 ChannelHandlerAdapter,這個(gè)類(lèi)實(shí)現(xiàn)了ChannelHandler接口,ChannelHandler提供了許多事件處理的接口方法,然后你可以覆蓋這些方法?,F(xiàn)在僅僅只需要繼承ChannelHandlerAdapter類(lèi)而不是你自己去實(shí)現(xiàn)接口方法。
1.由于我們?cè)賡erver端開(kāi)始的時(shí)候已經(jīng)定義了接收類(lèi)型為String,所以在這里我們接收到的msg直接強(qiáng)轉(zhuǎn)成String就可以了。同時(shí)也要定義以什么為一次請(qǐng)求的結(jié)尾。
Client.java
public class Client {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup worker = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(worker)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,buf));
sc.pipeline().addLast(new StringDecoder());
sc.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture f=b.connect("127.0.0.1",8765).sync();
f.channel().writeAndFlush(Unpooled.copiedBuffer(" hi server2$_".getBytes()));
f.channel().writeAndFlush(Unpooled.copiedBuffer(" hi server3$_".getBytes()));
f.channel().writeAndFlush(Unpooled.copiedBuffer(" hi server4$_".getBytes()));
f.channel().closeFuture().sync();
worker.shutdownGracefully();
}
}
client端和Socket端幾乎代碼相同,只是client端用的不是ServerBootstrap而是Bootstrap來(lái)管理連接。這里沒(méi)什么好說(shuō)的。
ClientHandler.java
public class ClientHandler extends ChannelHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
System.out.println("client"+msg.toString());
} finally {
ReferenceCountUtil.release(msg);//釋放緩沖區(qū)
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
ClientHandler和ServertHandler代碼和原理也是一樣,只是在client端我們要釋放緩沖區(qū)。為什么在ServerHandler我們不需要釋放呢 ?因?yàn)樵赟ervertHandler我們調(diào)用ctx.writeAndFlush方法的時(shí)候,這個(gè)方法默認(rèn)已經(jīng)幫我們釋放了緩沖區(qū)。
總結(jié)
以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作能帶來(lái)一定的幫助,如果有疑問(wèn)大家可以留言交流,謝謝大家對(duì)腳本之家的支持。
相關(guān)文章
Spring中常見(jiàn)的7種BeanDefinition詳解
在?Spring?容器中,我們廣泛使用的是一個(gè)一個(gè)的?Bean,BeanDefinition?從名字上就可以看出是關(guān)于?Bean?的定義,下面就跟隨小編一起深入了解一下常見(jiàn)的7中BeanDefinition吧2023-09-09
Java日期毫秒值和常見(jiàn)日期時(shí)間格式相互轉(zhuǎn)換方法
這篇文章主要給大家介紹了關(guān)于Java日期毫秒值和常見(jiàn)日期時(shí)間格式相互轉(zhuǎn)換的相關(guān)資料,在Java的日常開(kāi)發(fā)中,會(huì)隨時(shí)遇到需要對(duì)時(shí)間處理的情況,文中給出了詳細(xì)的示例代碼,需要的朋友可以參考下2023-07-07
java開(kāi)源區(qū)塊鏈jdchain入門(mén)
這篇文章主要介紹了java開(kāi)源區(qū)塊鏈jdchain入門(mén),文中為大家講解了關(guān)于部署及組件遇到的一些問(wèn)題,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步2022-02-02
JAVA對(duì)list集合進(jìn)行排序Collections.sort()
這篇文章主要介紹了JAVA對(duì)list集合進(jìn)行排序Collections.sort(),需要的朋友可以參考下2017-01-01
Flink開(kāi)發(fā)IDEA環(huán)境搭建與測(cè)試的方法
這篇文章主要介紹了Flink開(kāi)發(fā)IDEA環(huán)境搭建與測(cè)試的方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-03-03
Java后臺(tái)開(kāi)發(fā)之表單提交之前驗(yàn)證
這篇文章主要介紹了Java后臺(tái)開(kāi)發(fā)之表單提交之前驗(yàn)證的實(shí)現(xiàn)代碼,非常不錯(cuò)具有參考借鑒價(jià)值,需要的朋友參考下吧2017-02-02

