java Nio使用NioSocket客戶端與服務(wù)端交互實現(xiàn)方式
NioSocket 客戶端與服務(wù)端交互實現(xiàn)
java Nio是jdk1.4新增的io方式—–nio(new IO),這種方式在目前來說算不算new,更合適的解釋應(yīng)該是non-block IO。
non-block是相對于傳統(tǒng)的io方式來講的。傳統(tǒng)的Io方式是阻塞的,我們拿網(wǎng)絡(luò)io來舉例,傳統(tǒng)的io模型如下:

服務(wù)端主線程負(fù)責(zé)不斷地server.accept(),如果沒有客戶端請求主線程就會阻塞,當(dāng)客戶端請求時,主線程會通過線程池創(chuàng)建一個新的線程執(zhí)行。
簡單解釋就是一個線程負(fù)責(zé)一個客戶端的socket,當(dāng)客戶端因網(wǎng)絡(luò)等原因傳遞速度慢的時候,服務(wù)端對應(yīng)的客戶端的線程就會等待,很浪費資源。
同時線程過少的話會影響服務(wù)的吞吐量,而線程過多的話由于上下文切換等原因會導(dǎo)致效率十分低下,傳統(tǒng)的io方式并不適合如今的網(wǎng)絡(luò)流量。
Nio的模型如下:

nio相比傳統(tǒng)的io模型,最大的特點是優(yōu)化了線程的使用。
nio通過selector可以使用一個線程去管理多個socket句柄,說是管理也不太合適,nio是采用的事件驅(qū)動模型,selector負(fù)責(zé)的是監(jiān)控各個連接句柄的狀態(tài),不是去輪詢每個句柄,而是在數(shù)據(jù)就緒后,將消息通知給selector,而具體的socket句柄管理則是采用多路復(fù)用的模型,交由操作系統(tǒng)來完成。
selector充當(dāng)?shù)氖且粋€消息的監(jiān)聽者,負(fù)責(zé)監(jiān)聽channel在其注冊的事件,這樣就可以通過一個線程完成了大量連接的管理,當(dāng)注冊的事件發(fā)生后,再調(diào)用相應(yīng)線程進(jìn)行處理。
這樣就不需要為每個連接都使用一個線程去維持長連接,減少了長連接的開銷,同時減少了上下文的切換提高了系統(tǒng)的吞吐量。
java Nio的組成
java Nio主要由三個核心部分組成:
- Buffer - Channel - Selector
所有的io的Nio都是從一個channel開始的,Channel有點類似于流,但是和流不同的是,channel是可以雙向讀寫的。Channel有幾種類型,主要包含文件io操作和網(wǎng)絡(luò)io:
- FileChannel (文件io) - DatagramChannel (udp數(shù)據(jù)報) - SocketChannel (tcp客戶端) - ServerSocketChannel (tcp服務(wù)端)
Buffer是一個中間緩存區(qū),數(shù)據(jù)可以從channel讀取到buffer,也可以從buffer寫到channel中,在java中,傳統(tǒng)方式與io的交互,需要將數(shù)據(jù)從堆內(nèi)存讀取到直接內(nèi)存中,然后交由c語言來調(diào)用系統(tǒng)服務(wù)完成io的交互。
而使用Buffer可以直接在直接內(nèi)存中開辟內(nèi)存區(qū)域,減少了io復(fù)制的操作,從而提高了io操作的效率。
#基本數(shù)據(jù)類型的buffer - ByteBuffer - CharBuffer - DoubleBuffer - FloatBuffer - IntBuffer - LongBuffer - ShortBuffer #文件內(nèi)存映射buffer - MappedByteBuffer #直接內(nèi)存區(qū)buffer - DirectBuffer
Selector允許單個線程處理多個channel,可以將多個channel教給selector管理,并注冊相應(yīng)的事件,而selector則采用事件驅(qū)動的方式,當(dāng)注冊的事件就緒后,調(diào)用相應(yīng)的相應(yīng)的線程處理該時間,不用使用線程去維持長連接,減少了線程的開銷。
Selector通過靜態(tài)工廠的open方法建立,然后通過channel的register注冊到Channel上。
注冊后通過select方法等待請求,select請求有l(wèi)ong類型參數(shù),代表等待時間,如果等待時間內(nèi)接受到操作請求,則返回可以操作請求的數(shù)量,否則超時往下走。
傳入?yún)?shù)為零或者無參方法,則會采用阻塞模式知道有相應(yīng)請求。
收到請求后調(diào)用selectedKeys返回SelectionKey的集合。
SelectionKey保存了處理當(dāng)前請求的Channel和Selector,并且提供了不同的操作類型。
SelectionKey的操作有四種:
- SelectionKey.OP_CONNECT - SelectionKey.OP_ACCEPT - SelectionKey.OP_READ - SelectionKey.OP_WRITE
下面為一個客戶端與服務(wù)端實用NioSocket交互的簡單例子:
//對selectionKey事件的處理
/**
* description:
*
* @author wkGui
*/
interface ServerHandlerBs {
void handleAccept(SelectionKey selectionKey) throws IOException;
String handleRead(SelectionKey selectionKey) throws IOException;
}
/**
* description:
*
* @author wkGui
*/
public class ServerHandlerImpl implements ServerHandlerBs {
private int bufferSize = 1024;
private String localCharset = "UTF-8";
public ServerHandlerImpl() {
}
public ServerHandlerImpl(int bufferSize) {
this(bufferSize, null);
}
public ServerHandlerImpl(String localCharset) {
this(-1, localCharset);
}
public ServerHandlerImpl(int bufferSize, String localCharset) {
this.bufferSize = bufferSize > 0 ? bufferSize : this.bufferSize;
this.localCharset = localCharset == null ? this.localCharset : localCharset;
}
@Override
public void handleAccept(SelectionKey selectionKey) throws IOException {
//獲取channel
SocketChannel socketChannel = ((ServerSocketChannel) selectionKey.channel()).accept();
//非阻塞
socketChannel.configureBlocking(false);
//注冊selector
socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufferSize));
System.out.println("建立請求......");
}
@Override
public String handleRead(SelectionKey selectionKey) throws IOException {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
String receivedStr = "";
if (socketChannel.read(buffer) == -1) {
//沒讀到內(nèi)容關(guān)閉
socketChannel.shutdownOutput();
socketChannel.shutdownInput();
socketChannel.close();
System.out.println("連接斷開......");
} else {
//將channel改為讀取狀態(tài)
buffer.flip();
//按照編碼讀取數(shù)據(jù)
receivedStr = Charset.forName(localCharset).newDecoder().decode(buffer).toString();
buffer.clear();
//返回數(shù)據(jù)給客戶端
buffer = buffer.put(("received string : " + receivedStr).getBytes(localCharset));
//讀取模式
buffer.flip();
socketChannel.write(buffer);
//注冊selector 繼續(xù)讀取數(shù)據(jù)
socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufferSize));
}
return receivedStr;
}
}
//服務(wù)端server類
/**
* description:
*
* @author wkGui
*/
public class NioSocketServer {
private volatile byte flag = 1;
public void setFlag(byte flag) {
this.flag = flag;
}
public void start() {
//創(chuàng)建serverSocketChannel,監(jiān)聽8888端口
try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {
serverSocketChannel.socket().bind(new InetSocketAddress(8888));
//設(shè)置為非阻塞模式
serverSocketChannel.configureBlocking(false);
//為serverChannel注冊selector
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服務(wù)端開始工作:");
//創(chuàng)建消息處理器
ServerHandlerBs handler = new ServerHandlerImpl(1024);
while (flag == 1) {
selector.select();
System.out.println("開始處理請求 : ");
//獲取selectionKeys并處理
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
try {
//連接請求
if (key.isAcceptable()) {
handler.handleAccept(key);
}
//讀請求
if (key.isReadable()) {
System.out.println(handler.handleRead(key));
}
} catch (IOException e) {
e.printStackTrace();
}
//處理完后移除當(dāng)前使用的key
keyIterator.remove();
}
System.out.println("完成請求處理。");
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
//server端啟動類
/**
* description:
*
* @author wkGui
*/
public class ServerMain {
public static void main(String[] args) {
NioSocketServer server = new NioSocketServer();
new Thread(() -> {
try {
Thread.sleep(10*60*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
server.setFlag((byte) 0);
}
}).start();
server.start();
}
}
//客戶端client類
/**
* description:
*
* @author wkGui
*/
public class NioSocketClient {
public void start() {
try (SocketChannel socketChannel = SocketChannel.open()) {
//連接服務(wù)端socket
SocketAddress socketAddress = new InetSocketAddress("localhost", 8888);
socketChannel.connect(socketAddress);
int sendCount = 0;
ByteBuffer buffer = ByteBuffer.allocate(1024);
//這里最好使用selector處理 這里只是為了寫的簡單
while (sendCount < 10) {
buffer.clear();
//向服務(wù)端發(fā)送消息
buffer.put(("current time : " + System.currentTimeMillis()).getBytes());
//讀取模式
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
//從服務(wù)端讀取消息
int readLenth = socketChannel.read(buffer);
//讀取模式
buffer.flip();
byte[] bytes = new byte[readLenth];
buffer.get(bytes);
System.out.println(new String(bytes, "UTF-8"));
buffer.clear();
sendCount++;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
//client啟動類
/**
* description:
*
* @author wkGui
*/
public class ClientMain {
public static void main(String[] args) {
new NioSocketClient().start();
}
}
Java NIO 實現(xiàn) WebSocket 協(xié)議
WebSocket協(xié)議
WebSocket是一種在單個TCP連接上進(jìn)行全雙工通信的協(xié)議。 WebSocket使得客戶端和服務(wù)器之間的數(shù)據(jù)交換變得更加簡單,允許服務(wù)端主動向客戶端推送數(shù)據(jù)。在WebSocket API中,瀏覽器和服務(wù)器只需要完成一次握手,兩者之間就直接可以創(chuàng)建持久性的連接,并進(jìn)行雙向數(shù)據(jù)傳輸。
WebSocket協(xié)議相比于Http協(xié)議來說,最大的特點就是可以實現(xiàn)服務(wù)端主動向客戶端發(fā)送消息。在WebSocket出現(xiàn)之前,如果客戶端想實時獲取服務(wù)端的消息,就需要使用AJAX輪詢,查詢是否有消息,這樣就很消耗服務(wù)器資源和帶寬。但是用WebSocket就可以實現(xiàn)服務(wù)端主動向客戶端發(fā)送數(shù)據(jù),并且只需要占用一個TCP連接,節(jié)省了資源和帶寬。

WebSocket連接建立過程
為了建立一個WebSocket連接,客戶端瀏覽器首先要向服務(wù)器發(fā)起一個HTTP請求,這個請求和通常的HTTP請求不同,包含了一些附加的頭信息,其中附加頭信息“Upgrade: WebSocket” 表明這是一個申請協(xié)議升級的HTTP請求。服務(wù)器端解析這些附加的信息頭,然后生成應(yīng)答消息返回給客戶端,客戶端和服務(wù)端的WebSocket連接就建立了。之后就可以使用WebSocket協(xié)議的格式來雙向發(fā)送消息。
建立連接時發(fā)送的HTTP請求頭:

返回的HTTP響應(yīng)頭:

在響應(yīng)頭中的 Sec-WebSocket-Accept 時通過Sec-WebSocket-Key構(gòu)造出來的。首先在Sec-WebSocket-Key后接上一個258EAFA5-E914-47DA-95CA-C5AB0DC85B11,然后再進(jìn)行SHA1摘要得到160位數(shù)據(jù)在,在使用BASE64進(jìn)行編碼,最后得到的就是Sec-WebSocket-Accept。
WebSocket數(shù)據(jù)發(fā)送過程
WebSocket數(shù)據(jù)發(fā)送的幀格式如下所示:

FIN - 1bit
在數(shù)據(jù)發(fā)送的過程中,可能會分片發(fā)送,F(xiàn)IN表示是否為最后一個分片。如果發(fā)生了分片,則1表示時最后一個分片;不能再分片的情況下,這個標(biāo)志總是為1。
RSV1 RSV2 RSV3 - 1bit each
用于擴展,不使用擴展時需要為全0;非零時通信雙方必須協(xié)商好擴展。這里我們用不上。
OPCODE - 4bits
用于表示所傳送數(shù)據(jù)的類型,也就是payload中的數(shù)據(jù)。
| 數(shù)值 | 含義 |
|---|---|
| 0x0 | 附加數(shù)據(jù)幀 |
| 0x1 | 文本數(shù)據(jù)幀 |
| 0x2 | 二進(jìn)制數(shù)據(jù)幀 |
| 0x3-0x7 | 保留 |
| 0x8 | 關(guān)閉連接幀 |
| 0x9 | ping幀 |
| 0xA | pong幀 |
| 0xB-0xF | 保留 |
MASK - 1bit
用于表示payload是否被進(jìn)行了掩碼運算,1表示使用掩碼,0表示不使用掩碼。從客戶端發(fā)送向服務(wù)端的數(shù)據(jù)幀必須使用掩碼。
Payload length 7 bits,7+16 bits or 7+64 bits
用于表示payload的長度,有以下三種情況:
| Payload length 表示的大小 | payload的長度 |
|---|---|
| 0 - 125 | Payload length 大小 |
| 126 | 之后的2個字節(jié)表示的無符號整數(shù) |
| 127 | 之后的8個字節(jié)表示的無符號整數(shù) |
Masking-key - 0 or 4 bytes
32 bit長的掩碼,如果MASK為1,則幀中就存在這一個字段,在解析payload時,需要進(jìn)行使用32長掩碼進(jìn)行異或操作,之后才能得到正確結(jié)果。
Java NIO 實現(xiàn)
利用Java NIO 來實現(xiàn)一個聊天室。部分代碼如下。
NIO的常規(guī)代碼:
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
if (key.isAcceptable()) {
handleAccept(key);
}
if (key.isReadable()) {
handleRead(key);
}
}
接受連接:
public void handleAccept(SelectionKey key) {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc;
try {
sc = ssc.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
System.out.println(String.format("[server] -- client %s connected.", sc.getRemoteAddress().toString()));
} catch (IOException e) {
System.out.println(String.format("[server] -- error occur when accept: %s.", e.getMessage()));
key.cancel();
}
}
讀取通道中的數(shù)據(jù):
public void handleRead(SelectionKey key) {
SocketChannel sc = (SocketChannel) key.channel();
Client client = (Client) key.attachment();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 如果是第一次連接進(jìn)來,就需要創(chuàng)建一個客戶端對象,存儲起來
if (client == null) {
client = new Client(sc);
clients.add(client);
key.attach(client);
byteBuffer.clear();
// 如果連接還沒有建立,就是要HTTP建立連接
try {
sc.read(byteBuffer);
byteBuffer.flip();
String response = WebSocketHandler.getResponse(new String(byteBuffer.array()));
byteBuffer.clear();
byteBuffer.put(response.getBytes());
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
sc.write(byteBuffer);
}
} catch (IOException e) {
System.out.println(String.format("[server] -- error occur when read: %s.", e.getMessage()));
}
String message = "[系統(tǒng)消息] " + client.toString() + " 加入了群聊";
broadcast(message.getBytes(), client);
}
byteBuffer.clear();
int read = 0;
try {
read = sc.read(byteBuffer);
if (read > 0) {
byteBuffer.flip();
int opcode = byteBuffer.get() & 0x0f;
// 8表示客戶端關(guān)閉了連接
if (opcode == 8) {
System.out.println(String.format("[server] -- client %s connection close.", sc.getRemoteAddress()));
clients.remove(client);
String message = "[系統(tǒng)消息] " + client.toString() + " 退出了群聊";
broadcast(message.getBytes(), client);
sc.close();
key.cancel();
return;
}
// 只考慮了最簡單的payload長度情況。
int len = byteBuffer.get();
len &= 0x7f;
byte[] mask = new byte[4];
byteBuffer.get(mask);
byte[] payload = new byte[len];
byteBuffer.get(payload);
for (int i = 0; i < payload.length; i++) {
payload[i] ^= mask[i % 4];
}
System.out.println(String
.format("[server] -- client: [%s], send: [%s].", client.toString(), new String(payload)));
String message = String.format("[%s]: %s", client.toString(), new String(payload));
broadcast(message.getBytes(), client);
} else if (read == -1) {
System.out.println(String.format("[server] -- client %s connection close.", sc.getRemoteAddress()));
clients.remove(client);
String message = "[系統(tǒng)消息] " + client.toString() + " 退出了群聊";
broadcast(message.getBytes(), client);
sc.close();
key.cancel();
}
} catch (IOException e) {
System.out.println(String.format("[server] -- error occur when read: %s.", e.getMessage()));
}
}
使用HTTP建立WebSocket連接。
public class WebSocketHandler {
private static String APPEND_STRING = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
static class Header {
private Map<String, String> properties = new HashMap<>();
public String get(String key) {
return properties.get(key);
}
}
private WebSocketHandler() {}
private static Header phrase(String request) {
Header header = new Header();
String[] pros = request.split("\r\n");
for (String pro : pros) {
if (pro.contains(":")) {
int index = pro.indexOf(":");
String key = pro.substring(0, index).trim();
String value = pro.substring(index + 1).trim();
header.properties.put(key, value);
}
}
return header;
}
public static String getResponse(String request) {
Header header = phrase(request);
String acceptKey = header.get("Sec-WebSocket-Key") + APPEND_STRING;
MessageDigest sha1;
try {
sha1 = MessageDigest.getInstance("sha1");
sha1.update(acceptKey.getBytes());
acceptKey = new String(Base64.getEncoder().encode(sha1.digest()));
} catch (NoSuchAlgorithmException e) {
System.out.println("fail to encode " + e.getMessage());
return null;
}
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("HTTP/1.1 101 Switching Protocols\r\n").append("Upgrade: websocket\r\n")
.append("Connection: Upgrade\r\n").append("Sec-WebSocket-Accept: " + acceptKey + "\r\n")
.append("\r\n");
return stringBuilder.toString();
}
}
客戶端對象
/**
* @author XinHui Chen
* @date 2020/2/8 19:20
*/
public class Client {
private SocketChannel socketChannel = null;
private String id = null;
public SocketChannel getSocketChannel() {
return socketChannel;
}
public String getId() {
return id;
}
Client(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
this.id = UUID.randomUUID().toString();
}
@Override
public String toString() {
try {
return id + " " + socketChannel.getRemoteAddress().toString();
} catch (IOException e) {
System.out.println(e.getMessage());
return null;
}
}
}
結(jié)果
使用網(wǎng)頁和控制臺與服務(wù)端建立WebSocket連接,發(fā)送數(shù)據(jù)。兩個都能成功顯示。

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
Java讀寫鎖ReadWriteLock原理與應(yīng)用場景詳解
這篇文章主要介紹了Java讀寫鎖ReadWriteLock原理與應(yīng)用場景詳解,讀寫狀態(tài)的設(shè)計,寫鎖的獲取與釋放,鎖降級需要的朋友可以參考下2023-02-02
解析Idea為什么不推薦使用@Autowired進(jìn)行Field注入
這篇文章主要介紹了Idea不推薦使用@Autowired進(jìn)行Field注入的原因,網(wǎng)上文章大部分都是介紹兩者的區(qū)別,沒有提到為什么,當(dāng)時想了好久想出了可能的原因,今天來總結(jié)一下2022-05-05
SSH框架網(wǎng)上商城項目第7戰(zhàn)之整合Struts2和Json
SSH框架網(wǎng)上商城項目第7戰(zhàn)之整合Struts2和Json,打通EasyUI和Struts2之間的交互,感興趣的小伙伴們可以參考一下2016-05-05

