Java實(shí)現(xiàn)非阻塞式服務(wù)器的示例代碼
1.創(chuàng)建阻塞的服務(wù)器
當(dāng) ServerSocketChannel 與 SockelChannel 采用默認(rèn)的阻塞模式時(shí),為了同時(shí)處理多個(gè)客戶的連接,必須使用多線程
public class EchoServer {
private int port = 8000;
private ServerSocketChannel serverSocketChannel = null;
private ExecutorService executorService; //線程池
private static final int POOL_MULTIPLE = 4; //線程池中工作線程的數(shù)目
public EchoServer() throws IOException {
//創(chuàng)建一個(gè)線程池
executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors() * POOL_MULTIPLE);
//創(chuàng)建一個(gè)ServerSocketChannel對(duì)象
serverSocketChannel = ServerSocketChannel.open();
//使得在同一個(gè)主機(jī)上關(guān)閉了服務(wù)器程序,緊接著再啟動(dòng)該服務(wù)器程序時(shí),可以順利綁定相同的端口
serverSocketChannel.socket().setReuseAddress(true);
//把服務(wù)器進(jìn)程與一個(gè)本地端口綁定
serverSocketChannel.socket().bind(new InetSocketAddress(port));
System.out.println("服務(wù)器啟動(dòng)");
}
public void service() {
while (true) {
SocketChannel socketChannel = null;
try {
socketChannel = serverSocketChannel.accept();
//處理客戶連接
executorService.execute(new Handler(socketChannel));
} catch(IOException e) {
e.printStackTrace();
}
}
}
public static void main(String args[])throws IOException {
new EchoServer().service();
}
//處理客戶連按
class Handler implements Runnable {
private SocketChannel socketChannel;
public Handler(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
public void run() {
handle(socketChannel);
}
public void handle(SocketChannel socketChannel) {
try {
//獲得與socketChannel關(guān)聯(lián)的Socket對(duì)象
Socket socket = socketChannel.socket();
System.out.println("接收到客戶連接,來(lái)自:" + socket.getInetAddress() + ":" + socket.getPort());
BufferedReader br = getReader(socket);
PrintWriter pw = getWriter(socket);
String msg = null;
while ((msg = br.readLine()) != null) {
System.out.println(msg);
pw.println(echo(msg));
if (msg.equals("bye")) {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if(socketChannel != null) {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
private PrintWriter getWriter(Socket socket) throws IOException {
OutputStream socketOut = socket.getOutputStream();
return new PrintWriter(socketOut,true);
}
private BufferedReader getReader(Socket socket) throws IOException {
InputStream socketIn = socket.getInputStream();
return new BufferedReader(new InputStreamReader(socketIn));
}
public String echo(String msg) {
return "echo:" + msg;
}
}2.創(chuàng)建非阻塞的服務(wù)器
在非阻塞模式下,EchoServer 只需要啟動(dòng)一個(gè)主線程,就能同時(shí)處理三件事:
- 接收客戶的連接
- 接收客戶發(fā)送的數(shù)據(jù)
- 向客戶發(fā)回響應(yīng)數(shù)據(jù)
EchoServer 委托 Selector 來(lái)負(fù)責(zé)監(jiān)控接收連接就緒事件、讀就緒事件和寫就緒事件如果有特定事件發(fā)生,就處理該事件
// 創(chuàng)建一個(gè)Selector對(duì)象 selector = Selector.open(); //創(chuàng)建一個(gè)ServerSocketChannel對(duì)象 serverSocketChannel = ServerSocketChannel.open(); //使得在同一個(gè)主機(jī)上關(guān)閉了服務(wù)器程序,緊接著再啟動(dòng)該服務(wù)器程序時(shí) //可以順利綁定到相同的端口 serverSocketChannel.socket().setReuseAddress(true); //使ServerSocketChannel工作于非阻塞模式 serverSocketChannel.configureBlocking(false): //把服務(wù)器進(jìn)程與一個(gè)本地端口綁定 serverSocketChannelsocket().bind(new InetSocketAddress(port));
EchoServer 類的 service() 方法負(fù)責(zé)處理本節(jié)開頭所說(shuō)的三件事,體現(xiàn)其主要流程的代碼如下:
public void service() throws IOException {
serverSocketChannel.reqister(selector, SelectionKey.OP_ACCEPT);
//第1層while循環(huán)
while(selector.select() > 0) {
//獲得Selector的selected-keys集合
Set readyKeys = selector.selectedKeys();
Iterator it = readyKeys.iterator();
//第2層while循環(huán)
while (it.hasNext()) {
SelectionKey key = null;
//處理SelectionKey
try {
//取出一個(gè)SelectionKey
key = (SelectionKey) it.next();
//把 SelectionKey從Selector 的selected-key 集合中刪除
it.remove();
1f (key.isAcceptable()) { 處理接收連接就緒事件; }
if (key.isReadable()) { 處理讀就緒水件; }
if (key.isWritable()) { 處理寫就緒事件; }
} catch(IOException e) {
e.printStackTrace();
try {
if(key != null) {
//使這個(gè)SelectionKey失效
key.cancel();
//關(guān)閉與這個(gè)SelectionKey關(guān)聯(lián)的SocketChannel
key.channel().close();
}
} catch(Exception ex) {
e.printStackTrace();
}
}
}
}
}- 首先由
ServerSocketChannel向Selector注冊(cè)接收連接就緒事件,如果Selector監(jiān)控到該事件發(fā)生,就會(huì)把相應(yīng)的SelectionKey對(duì)象加入selected-keys集合 - 第一層 while 循環(huán),不斷詢問(wèn)
Selector已經(jīng)發(fā)生的事件,select()方法返回當(dāng)前相關(guān)事件已經(jīng)發(fā)生的SelectionKey的個(gè)數(shù),如果當(dāng)前沒(méi)有任何事件發(fā)生,該方法會(huì)阻塞下去,直到至少有一個(gè)事件發(fā)生。Selector的selectedKeys()方法返回selected-keys集合,它存放了相關(guān)事件已經(jīng)發(fā)生的SelectionKey對(duì)象 - 第二層 while 循環(huán),從
selected-keys集合中依次取出每個(gè)SelectionKey對(duì)象并從集合中刪除,,然后調(diào)用isAcceptable()、isReadable()和isWritable()方法判斷到底是哪種事件發(fā)生了,從而做出相應(yīng)的處理
2.1處理接收連接就緒事件
if (key.isAcceptable()) {
//獲得與SelectionKey關(guān)聯(lián)的ServerSocketChannel
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//獲得與客戶連接的SocketChannel
SocketChannel socketChannel = (SocketChannel) ssc.accept();
//把Socketchannel設(shè)置為非阻塞模式
socketChannel.configureBlocking(false);
//創(chuàng)建一個(gè)用于存放用戶發(fā)送來(lái)的數(shù)據(jù)的級(jí)沖區(qū)
ByteBuffer buffer = ByteBuffer.allocate(1024);
//Socketchannel向Selector注冊(cè)讀就緒事件和寫就緒事件
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
}2.2處理讀就緒事件
public void receive(SelectionKey key) throws IOException {
//獲得與SelectionKey關(guān)聯(lián)的附件
ByteBuffer buffer = (ByteBuffer) key.attachment();
//獲得與SelectionKey關(guān)聯(lián)的Socketchannel
SocketChannel socketChannel = (SocketChannel)key.channel();
//創(chuàng)建一個(gè)ByteBuffer用于存放讀到的數(shù)據(jù)
ByteBuffer readBuff = ByteBuffer.allocate(32);
socketChannel.read(readBuff);
readBuff.flip();
//把buffer的極限設(shè)為容量
buffer.limit(buffer.capacity());
//把readBuff中的內(nèi)容拷貝到buffer
buffer.put(readBuff);
}2.3處理寫就緒事件
public void send(SelectionKey key) throws IOException {
//獲得與SelectionKey關(guān)聯(lián)的ByteBuffer
ByteBuffer buffer = (ByteBuffer) key.attachment();
//獲得與SelectionKey關(guān)聯(lián)的SocketChannel
SocketChannel socketChannel = (SocketChannel) key.channel();
buffer.flip();
//按照GBK編碼把buffer中的字節(jié)轉(zhuǎn)換為字符串
String data = decode(buffer);
//如果還沒(méi)有讀到一行數(shù)據(jù)就返回
if(data.indexOf("\r\n") == -1)
return;
//截取一行數(shù)據(jù)
String outputData = data.substring(0, data.indexOf("\n") + 1);
//把輸出的字符串按照GBK編碼轉(zhuǎn)換為字節(jié),把它放在outputBuffer中
ByteBuffer outputBuffer = encode("echo:" + outputData);
//輸出outputBuffer的所有字節(jié)
while(outputBuffer,hasRemaining())
socketChannel.write(outputBuffer);
//把outputData字符審按照GBK編碼,轉(zhuǎn)換為字節(jié),把它放在ByteBuffer
ByteBuffer temp = encode(outputData);
//把buffer的位置設(shè)為temp的極限
buffer.position(temp.limit()):
//刪除buffer已經(jīng)處理的數(shù)據(jù)
buffer.compact();
//如果已經(jīng)輸出了字符串“bye\r\n”,就使SelectionKey失效,并關(guān)閉SocketChannel
if(outputData.equals("bye\r\n")) {
key.cancel();
socketChannel.close();
}
}完整代碼如下:
public class EchoServer {
private int port = 8000;
private ServerSocketChannel serverSocketChannel = null;
private Selector selector;
private Charset charset = Charset.forName("GBK");
public EchoServer() throws IOException {
// 創(chuàng)建一個(gè)Selector對(duì)象
selector = Selector.open();
//創(chuàng)建一個(gè)ServerSocketChannel對(duì)象
serverSocketChannel = ServerSocketChannel.open();
//使得在同一個(gè)主機(jī)上關(guān)閉了服務(wù)器程序,緊接著再啟動(dòng)該服務(wù)器程序時(shí)
//可以順利綁定到相同的端口
serverSocketChannel.socket().setReuseAddress(true);
//使ServerSocketChannel工作于非阻塞模式
serverSocketChannel.configureBlocking(false):
//把服務(wù)器進(jìn)程與一個(gè)本地端口綁定
serverSocketChannelsocket().bind(new InetSocketAddress(port));
}
public void service() throws IOException {
serverSocketChannel.reqister(selector, SelectionKey.OP_ACCEPT);
//第1層while循環(huán)
while(selector.select() > 0) {
//獲得Selector的selected-keys集合
Set readyKeys = selector.selectedKeys();
Iterator it = readyKeys.iterator();
//第2層while循環(huán)
while (it.hasNext()) {
SelectionKey key = null;
//處理SelectionKey
try {
//取出一個(gè)SelectionKey
key = (SelectionKey) it.next();
//把 SelectionKey從Selector 的selected-key 集合中刪除
it.remove();
1f (key.isAcceptable()) {
//獲得與SelectionKey關(guān)聯(lián)的ServerSocketChannel
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//獲得與客戶連接的SocketChannel
SocketChannel socketChannel = (SocketChannel) ssc.accept();
//把Socketchannel設(shè)置為非阻塞模式
socketChannel.configureBlocking(false);
//創(chuàng)建一個(gè)用于存放用戶發(fā)送來(lái)的數(shù)據(jù)的級(jí)沖區(qū)
ByteBuffer buffer = ByteBuffer.allocate(1024);
//Socketchannel向Selector注冊(cè)讀就緒事件和寫就緒事件
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
}
if (key.isReadable()) { receive(key); }
if (key.isWritable()) { send(key); }
} catch(IOException e) {
e.printStackTrace();
try {
if(key != null) {
//使這個(gè)SelectionKey失效
key.cancel();
//關(guān)閉與這個(gè)SelectionKey關(guān)聯(lián)的SocketChannel
key.channel().close();
}
} catch(Exception ex) {
e.printStackTrace();
}
}
}
}
}
public void receive(SelectionKey key) throws IOException {
//獲得與SelectionKey關(guān)聯(lián)的附件
ByteBuffer buffer = (ByteBuffer) key.attachment();
//獲得與SelectionKey關(guān)聯(lián)的Socketchannel
SocketChannel socketChannel = (SocketChannel)key.channel();
//創(chuàng)建一個(gè)ByteBuffer用于存放讀到的數(shù)據(jù)
ByteBuffer readBuff = ByteBuffer.allocate(32);
socketChannel.read(readBuff);
readBuff.flip();
//把buffer的極限設(shè)為容量
buffer.limit(buffer.capacity());
//把readBuff中的內(nèi)容拷貝到buffer
buffer.put(readBuff);
}
public void send(SelectionKey key) throws IOException {
//獲得與SelectionKey關(guān)聯(lián)的ByteBuffer
ByteBuffer buffer = (ByteBuffer) key.attachment();
//獲得與SelectionKey關(guān)聯(lián)的SocketChannel
SocketChannel socketChannel = (SocketChannel) key.channel();
buffer.flip();
//按照GBK編碼把buffer中的字節(jié)轉(zhuǎn)換為字符串
String data = decode(buffer);
//如果還沒(méi)有讀到一行數(shù)據(jù)就返回
if(data.indexOf("\r\n") == -1)
return;
//截取一行數(shù)據(jù)
String outputData = data.substring(0, data.indexOf("\n") + 1);
//把輸出的字符串按照GBK編碼轉(zhuǎn)換為字節(jié),把它放在outputBuffer中
ByteBuffer outputBuffer = encode("echo:" + outputData);
//輸出outputBuffer的所有字節(jié)
while(outputBuffer,hasRemaining())
socketChannel.write(outputBuffer);
//把outputData字符審按照GBK編碼,轉(zhuǎn)換為字節(jié),把它放在ByteBuffer
ByteBuffer temp = encode(outputData);
//把buffer的位置設(shè)為temp的極限
buffer.position(temp.limit()):
//刪除buffer已經(jīng)處理的數(shù)據(jù)
buffer.compact();
//如果已經(jīng)輸出了字符串“bye\r\n”,就使SelectionKey失效,并關(guān)閉SocketChannel
if(outputData.equals("bye\r\n")) {
key.cancel();
socketChannel.close();
}
}
//解碼
public String decode(ByteBuffer buffer) {
CharBuffer charBuffer = charset.decode(buffer);
return charBuffer.toStrinq();
}
//編碼
public ByteBuffer encode(String str) {
return charset.encode(str);
}
public static void main(String args[])throws Exception {
EchoServer server = new EchoServer();
server.service();
}
}3.阻塞模式與非阻塞模式混合使用
使用非阻塞模式時(shí),ServerSocketChannel 以及 SocketChannel 都被設(shè)置為非阻塞模式,這使得接收連接、接收數(shù)據(jù)和發(fā)送數(shù)據(jù)的操作都采用非阻塞模式,EchoServer 采用一個(gè)線程同時(shí)完成這些操作
假如有許多客戶請(qǐng)求連接,可以把接收客戶連接的操作單獨(dú)由一個(gè)線程完成,把接收數(shù)據(jù)和發(fā)送數(shù)據(jù)的操作由另一個(gè)線程完成,這可以提高服務(wù)器的并發(fā)性能
負(fù)責(zé)接收客戶連接的線程按照阻塞模式工作,如果收到客戶連接,就向 Selector 注冊(cè)讀就緒和寫就緒事件,否則進(jìn)入阻塞狀態(tài),直到接收到了客戶的連接。負(fù)責(zé)接收數(shù)據(jù)和發(fā)送數(shù)據(jù)的線程按照非阻塞模式工作,只有在讀就緒或?qū)懢途w事件發(fā)生時(shí),才執(zhí)行相應(yīng)的接收數(shù)據(jù)和發(fā)送數(shù)據(jù)操作
public class EchoServer {
private int port = 8000;
private ServerSocketChannel serverSocketChannel = null;
private Selector selector = null;
private Charset charset = Charset.forName("GBK");
public EchoServer() throws IOException {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().setReuseAddress(true);
serverSocketChannelsocket().bind(new InetSocketAddress(port));
}
public void accept() {
while(true) {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
synchronized(gate) {
selector.wakeup();
socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer);
}
} catch(IOException e) {
e.printStackTrace();
}
}
}
private Object gate=new Object();
public void service() throws IOException {
while(true) {
synchronized(gate){}
int n = selector.select();
if(n == 0) continue;
Set readyKeys = selector.selectedKeys();
Iterator it = readyKeys.iterator();
while (it.hasNext()) {
SelectionKey key = null;
try {
it.remove();
if (key.isReadable()) {
receive(key);
}
if (key.isWritable()) {
send(key);
}
} catch(IOException e) {
e.printStackTrace();
try {
if(key != null) {
key.cancel();
key.channel().close();
}
} catch(Exception ex) { e.printStackTrace(); }
}
}
}
}
public void receive(SelectionKey key) throws IOException {
...
}
public void send(SelectionKey key) throws IOException {
...
}
public String decode(ByteBuffer buffer) {
...
}
public ByteBuffer encode(String str) {
...
}
public static void main(String args[])throws Exception {
final EchoServer server = new EchoServer();
Thread accept = new Thread() {
public void run() {
server.accept();
}
};
accept.start();
server.service();
}
}注意一點(diǎn):主線程的 selector select() 方法和 Accept 線程的 register(...) 方法都會(huì)造成阻塞,因?yàn)樗麄兌紩?huì)操作 Selector 對(duì)象的共享資源 all-keys 集合,這有可能會(huì)導(dǎo)致死鎖
導(dǎo)致死鎖的具體情形是:Selector 中尚沒(méi)有任何注冊(cè)的事件,即 all-keys 集合為空,主線程執(zhí)行 selector.select() 方法時(shí)將進(jìn)入阻塞狀態(tài),只有當(dāng) Accept 線程向 Selector 注冊(cè)了事件,并且該事件發(fā)生后,主線程才會(huì)從 selector.select() 方法返回。然而,由于主線程正在 selector.select() 方法中阻塞,這使得 Acccept 線程也在 register() 方法中阻塞。Accept 線程無(wú)法向 Selector 注冊(cè)事件,而主線程沒(méi)有任何事件可以監(jiān)控,所以這兩個(gè)線程將永遠(yuǎn)阻塞下去
為了避免對(duì)共享資源的競(jìng)爭(zhēng),同步機(jī)制使得一個(gè)線程執(zhí)行 register() 時(shí),不允許另一個(gè)線程同時(shí)執(zhí)行 select() 方法,反之亦然
到此這篇關(guān)于Java實(shí)現(xiàn)非阻塞式服務(wù)器的示例代碼的文章就介紹到這了,更多相關(guān)Java服務(wù)器內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java模擬實(shí)現(xiàn)QQ三方登錄(單點(diǎn)登錄2.0)
這篇文章主要為大家詳細(xì)介紹了Java模擬實(shí)現(xiàn)QQ三方登錄,單點(diǎn)登錄2.0,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-06-06
Java之next()、nextLine()區(qū)別及問(wèn)題解決
這篇文章主要介紹了Java之next()、nextLine()區(qū)別及問(wèn)題解決,本篇文章通過(guò)簡(jiǎn)要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下2021-08-08
Java?18?新特性之Web服務(wù)器?jwebserver功能
JEP?408:?Simple?Web?Server,是這次Java?18推出的一個(gè)比較獨(dú)立的全新功能點(diǎn)。我們可以通過(guò)命令行工具來(lái)啟動(dòng)一個(gè)提供靜態(tài)資源訪問(wèn)的迷你Web服務(wù)器,本文通過(guò)一個(gè)構(gòu)建HTML頁(yè)面的例子,來(lái)嘗試一下jwebserver的功能2022-04-04
IntelliJ IDEA導(dǎo)入Gradle項(xiàng)目的方法
這篇文章主要介紹了IntelliJ IDEA導(dǎo)入Gradle項(xiàng)目的方法,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-03-03
Mybatis-Plus中使用@DS注解動(dòng)態(tài)選擇數(shù)據(jù)源的源碼解讀
這篇文章主要介紹了Mybatis-Plus中使用@DS注解動(dòng)態(tài)選擇數(shù)據(jù)源的源碼解讀,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-07-07
解決springboot 啟動(dòng)找不到主類的問(wèn)題
這篇文章主要介紹了解決springboot 啟動(dòng)找不到主類的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08

