基于Java?NIO編寫一個簡單版Netty服務(wù)端
前置知識
NIO
- NIO 一般指 同步非阻塞 IO,同樣用于**描述程序訪問數(shù)據(jù)方式 **的還有BIO(同步阻塞)、AIO(異步非阻塞)
- 同步異步指獲取結(jié)果的方式,同步為主動去獲取結(jié)果,不管結(jié)果是否準(zhǔn)備好,異步為等待結(jié)果準(zhǔn)備好的通知
- 阻塞非阻塞是線程在結(jié)果沒有到來之前,是否進(jìn)行等待,阻塞為進(jìn)行等待,非阻塞則不進(jìn)行等待
- NIO 主動地去獲取結(jié)果,但是在結(jié)果沒有準(zhǔn)備好之前,不會進(jìn)行等待。而是通過一個 多路復(fù)用器 管理多個通道,由一個線程輪訓(xùn)地去檢查是否準(zhǔn)備好即可。在網(wǎng)絡(luò)編程中,多路復(fù)用器通常由操作系統(tǒng)提供,Linux中主要有 select、poll、epoll。同步非阻塞指線程不等待數(shù)據(jù)的傳輸,而是完成后由多路復(fù)用器通知,線程再將數(shù)據(jù)從內(nèi)核緩沖區(qū)拷貝到用戶空間內(nèi)存進(jìn)行處理。
Java NIO
基于 NIO 實現(xiàn)的網(wǎng)絡(luò)框架,可以用少量的線程,處理大量的連接,更適用于高并發(fā)場景。于是,Java提供了NIO包提供相關(guān)組件,用于實現(xiàn)同步非阻塞IO
核心三個類Channel、Buffer、Selector。Channel代表一個數(shù)據(jù)傳輸通道,但不進(jìn)行數(shù)據(jù)存取,有Buffer類進(jìn)行數(shù)據(jù)管理,Selector為一個復(fù)用器,管理多個通道
Bytebuffer
- 該類為NIO 包中用于操作內(nèi)存的抽象類,具體實現(xiàn)由HeapByteBuffer、DirectByteBuffer兩種
- HeapByteBuffer為堆內(nèi)內(nèi)存,底層通過 byte[ ] 存取數(shù)據(jù)
- DirectByteBuffer 為堆外內(nèi)存,通過JDK提供的 Unsafe類去存??;同時創(chuàng)建對象會關(guān)聯(lián)的一個Cleaner對象,當(dāng)對象被GC時,通過cleaner對象去釋放堆外內(nèi)存
各核心組件介紹
NioServer
為啟動程序類,監(jiān)聽端口,初始化Channel
下面為NIO模式下簡單服務(wù)端處理代碼
// 1、創(chuàng)建服務(wù)端Channel,綁定端口并配置非阻塞
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(6666));
serverSocketChannel.configureBlocking(false);
// 2、創(chuàng)建多路復(fù)用器selector,并將channel注冊到多路復(fù)用器上
// 不能直接調(diào)用channel的accept方法,因為屬于非阻塞,直接調(diào)用沒有新連接會直接返回
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 3、循環(huán)處理多路復(fù)用器的IO事件
while(true){
// 3.1、select屬于阻塞的方法,這里阻塞等待1秒
// 如果返回0,說明沒有事件處理
if (selector.select(1000) == 0){
System.out.println("服務(wù)器等待了1秒,無IO事件");
continue;
}
// 3.2、遍歷事件進(jìn)行處理
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while(iterator.hasNext()){
SelectionKey key = iterator.next();
// accept事件,說明有新的客戶端連接
if (key.isAcceptable()){
// 新建一個socketChannel,注冊到selector,并關(guān)聯(lián)buffer
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024));
System.out.println("客戶端連接:"+socketChannel.getRemoteAddress());
}
// read事件 (內(nèi)核緩沖區(qū)的數(shù)據(jù)準(zhǔn)備好了)
if(key.isReadable()){
SocketChannel channel = (SocketChannel)key.channel();
ByteBuffer byteBuffer = (ByteBuffer)key.attachment();
try {
// 將數(shù)據(jù)寫進(jìn)buffer
int readNum = channel.read(byteBuffer);
if (readNum == -1){
System.out.println("讀取-1時,表示IO流已結(jié)束");
channel.close();
break;
}
// 打印buffer
byteBuffer.flip();
byte[] bytes = new byte[readNum];
byteBuffer.get(bytes, 0, readNum);
System.out.println("讀取到數(shù)據(jù):" + new String(bytes));
} catch (IOException e) {
System.out.println("讀取發(fā)生異常,廣播socket");
channel.close();
}
}
// write事件 (操作系統(tǒng)有內(nèi)存寫出了)
if (key.isWritable()){
SocketChannel channel = (SocketChannel)key.channel();
// 讀取read時暫存數(shù)據(jù)
byte[] bytes = (byte[])key.attachment();
if (bytes != null){
System.out.println("可寫事件發(fā)生,寫入數(shù)據(jù): " + new String(bytes));
channel.write(ByteBuffer.wrap(bytes));
}
// 清空暫存數(shù)據(jù),并切換成關(guān)注讀事件
key.attach(null);
key.interestOps(SelectionKey.OP_READ);
}
iterator.remove();
}
}
EventLoop
處理 Channel 中數(shù)據(jù)的讀寫
- 在上面的Server中,大量并發(fā)時單線程地處理讀寫事件會導(dǎo)致延遲,因此將讀寫處理抽取出來,可利用多線程實現(xiàn)高并發(fā)
- 一個EventLoop會關(guān)聯(lián)一個selector,只會處理這個selector上的Channel
public class EventLoop2 implements Runnable{
private final Thread thread;
/**
* 復(fù)用器,當(dāng)前線程只處理這個復(fù)用器上的channel
*/
public Selector selector;
/**
* 待處理的注冊任務(wù)
*/
private final Queue<Runnable> queue = new LinkedBlockingQueue<>();
/**
* 初始化復(fù)用器,線程啟動
* @throws IOException
*/
public EventLoop2() throws IOException {
this.selector = SelectorProvider.provider().openSelector();
this.thread = new Thread(this);
thread.start();
}
/**
* 將通道注冊給當(dāng)前的線程處理
* @param socketChannel
* @param keyOps
*/
public void register(SocketChannel socketChannel,int keyOps){
// 將注冊新的socketChannel到當(dāng)前selector封裝成一個任務(wù)
queue.add(()->{
try {
MyChannel myChannel = new MyChannel(socketChannel, this);
SelectionKey key = socketChannel.register(selector, keyOps);
key.attach(myChannel);
} catch (Exception e){
e.printStackTrace();
}
});
// 喚醒阻塞等待的selector線程
selector.wakeup();
}
/**
* 循環(huán)地處理 注冊事件、讀寫事件
*/
@Override
public void run() {
while (!thread.isInterrupted()){
try {
int select = selector.select(1000);
// 處理注冊到當(dāng)前selector的事件
if (select == 0){
Runnable task;
while ((task = queue.poll()) != null){
task.run();
}
continue;
}
// 處理讀寫事件
System.out.println("服務(wù)器收到讀寫事件,select:" + select);
processReadWrite();
}catch (Exception e){
e.printStackTrace();
}
}
}
/**
* 處理讀寫事件
* @throws Exception
*/
private void processReadWrite() throws Exception{
System.out.println(Thread.currentThread() + "開始監(jiān)聽讀寫事件");
// 3.2、遍歷事件進(jìn)行處理
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while(iterator.hasNext()){
SelectionKey key = iterator.next();
MyChannel myChannel = (MyChannel)key.attachment();
if(key.isReadable()){
// 將數(shù)據(jù)讀進(jìn)buffer
myChannel.doRead(key);
}
if (key.isWritable()){
myChannel.doWrite(key);
}
iterator.remove();
}
}
}EventloopGroup
一組EventLoop,輪訓(xùn)地為eventLoop分配Channel
public class EventLoopGroup {
private EventLoop2[] children = new EventLoop2[1];
private AtomicInteger idx = new AtomicInteger(0);
public EventLoopGroup() throws IOException {
for (int i = 0; i < children.length; i++){
children[i] = new EventLoop2();
}
}
public EventLoop2 next(){
// 輪訓(xùn)每一個children
return children[idx.getAndIncrement() & (children.length - 1)];
}
public void register(SocketChannel channel,int ops){
next().register(channel,ops);
}
}
Channel
封裝了SocketChannel 和 Pipline,將從Channel讀寫的消息,沿著Pipline上的節(jié)點進(jìn)行處理
- 在上面EventLoop中,注冊Channel到對應(yīng)的Selector前,會進(jìn)行封裝,將自定義的Channel放在讀寫事件觸發(fā)時會返回的SelectionKey里面
- 同時提供了數(shù)據(jù)讀寫處理方法,讀寫事件觸發(fā)時調(diào)用該方法,數(shù)據(jù)會沿著pipline上去處理
public class MyChannel {
private SocketChannel channel;
private EventLoop2 eventLoop;
private Queue<ByteBuffer> writeQueue;
private PipLine pipLine;
/**
* 一個channel關(guān)聯(lián)一個eventLoop、一個pipLine、一個socketChannel、一個writeQueue
* @param channel
* @param eventLoop
*/
public MyChannel(SocketChannel channel, EventLoop2 eventLoop) {
this.channel = channel;
this.eventLoop = eventLoop;
this.writeQueue = new ArrayDeque<>();
this.pipLine = new PipLine(this,eventLoop);
this.pipLine.addLast(new MyHandler1());
this.pipLine.addLast(new MyHandler2());
}
/**
* 讀事件處理
* @param key
* @throws Exception
*/
public void doRead(SelectionKey key) throws Exception{
try {
ByteBuffer buffer = ByteBuffer.allocate(1024);
int readNum = channel.read(buffer);
if (readNum == -1){
System.out.println("讀取-1時,表示IO流已結(jié)束");
channel.close();
return;
}
// 轉(zhuǎn)成可讀狀態(tài)
buffer.flip();
// 消息放入pipLine,交給頭節(jié)點, 頭節(jié)點開始傳遞
pipLine.headContext.fireChannelRead(buffer);
} catch (IOException e) {
System.out.println("讀取發(fā)生異常,廣播socket");
channel.close();
}
}
/**
* 真正地寫出數(shù)據(jù),關(guān)注寫事件后,會觸發(fā)
* @param key
* @throws IOException
*/
public void doWrite(SelectionKey key) throws IOException{
ByteBuffer buffer;
while ((buffer =writeQueue.poll()) != null){
channel.write(buffer);
}
// 回復(fù)讀取狀態(tài)
key.interestOps(SelectionKey.OP_READ);
}
/**
* 寫出到隊列
* @param msg
*/
public void doWriteQueue(ByteBuffer msg){
writeQueue.add(msg);
}
/**
* 從最后一個節(jié)點進(jìn)行寫出,寫出到頭節(jié)點是調(diào)用doWriteQueue
* @param msg
*/
public void write(Object msg){
this.pipLine.tailContext.write(msg);
}
/**
* 從最后一個節(jié)點進(jìn)行flush,寫出到頭節(jié)點時調(diào)用doFlush
*/
public void flush(){
this.pipLine.tailContext.flush();
}
/**
* 關(guān)注寫事件,才能進(jìn)行真正地寫出
*/
public void doFlush(){
this.channel.keyFor(eventLoop.selector).interestOps(SelectionKey.OP_WRITE);
}
}Handler 和 HandlerContext
handler 接口定義了可以擴(kuò)展處理的消息,由開發(fā)人員實現(xiàn)具體的處理
handlerContext 類封裝了handler的實現(xiàn)類,將handler的上一個節(jié)點和下一個節(jié)點,讓消息可以延者鏈表傳遞
public interface Handler {
/**
* 讀取數(shù)據(jù)處理
* @param ctx
* @param msg
*/
void channelRead(HandlerContext ctx,Object msg);
/**
* 寫出數(shù)據(jù)
* @param ctx
* @param msg
*/
void write(HandlerContext ctx,Object msg);
/**
* 刷下數(shù)據(jù)
* @param ctx
*/
void flush(HandlerContext ctx);
}
public class HandlerContext {
private Handler handler;
MyChannel channel;
HandlerContext prev;
HandlerContext next;
public HandlerContext(Handler handler, MyChannel channel) {
this.handler = handler;
this.channel = channel;
}
/**
* 讀消息的傳遞,從頭節(jié)點開始往后傳
* @param msg
*/
public void fireChannelRead(Object msg){
HandlerContext next = this.next;
if (next != null){
next.handler.channelRead(next,msg);
}
}
/**
* 從尾節(jié)點開始往前傳
* @param msg
*/
public void write(Object msg){
HandlerContext prev = this.prev;
if (prev != null){
prev.handler.write(prev,msg);
}
}
/**
* 從尾節(jié)點開始往前傳
*/
public void flush(){
HandlerContext prev = this.prev;
if (prev != null){
prev.handler.flush(prev);
}
}
}Pipline
本質(zhì)是鏈表,包含了頭尾節(jié)點的HandlerContext,提供方法給開發(fā)人員加節(jié)點
public class PipLine {
private MyChannel channel;
private EventLoop2 eventLoop;
public HandlerContext headContext;
public HandlerContext tailContext;
public PipLine(MyChannel channel, EventLoop2 eventLoop) {
this.channel = channel;
this.eventLoop = eventLoop;
PipHandler headHandler = new PipHandler();
this.headContext = new HandlerContext(headHandler,channel);
PipHandler tailHandler = new PipHandler();
this.tailContext = new HandlerContext(tailHandler,channel);
// 構(gòu)建鏈表
this.headContext.next = this.tailContext;
this.tailContext.prev = this.headContext;
}
public void addLast(Handler handler){
HandlerContext curr = new HandlerContext(handler, channel);
// 連接在倒數(shù)第二個后面
HandlerContext lastButOne = this.tailContext.prev;
lastButOne.next = curr;
curr.prev = lastButOne;
// 連接在最后一個前面
curr.next = tailContext;
tailContext.prev = curr;
}
public static class PipHandler implements Handler{
@Override
public void channelRead(HandlerContext ctx, Object msg) {
System.out.println("接收"+(String) msg +"進(jìn)行資源釋放");
}
@Override
public void write(HandlerContext ctx, Object msg) {
System.out.println("寫出"+msg.toString());
}
@Override
public void flush(HandlerContext ctx) {
System.out.println("flush");
}
}
}到此這篇關(guān)于基于Java NIO編寫一個簡單版Netty服務(wù)端的文章就介紹到這了,更多相關(guān)Java NIO編寫Netty服務(wù)端內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
在CentOS系統(tǒng)中檢測Java安裝及運行jar應(yīng)用的方法
這篇文章主要介紹了在CentOS系統(tǒng)中檢測Java安裝及運行jar應(yīng)用的方法,同樣適用于Fedora等其他RedHat系的Linux系統(tǒng),需要的朋友可以參考下2015-06-06

