Socket結(jié)合線程池使用實(shí)現(xiàn)客戶端和服務(wù)端通信demo
引導(dǎo)語(yǔ)
Socket 面試最終題一般都是讓你寫(xiě)一個(gè)簡(jiǎn)單的客戶端和服務(wù)端通信的例子,本文就帶大家一起來(lái)寫(xiě)這個(gè) demo。
1、要求
- 可以使用 Socket 和 ServiceSocket 以及其它 API;
- 寫(xiě)一個(gè)客戶端和服務(wù)端之間 TCP 通信的例子;
- 服務(wù)端處理任務(wù)需要異步處理;
- 因?yàn)榉?wù)端處理能力很弱,只能同時(shí)處理 5 個(gè)請(qǐng)求,當(dāng)?shù)诹鶄€(gè)請(qǐng)求到達(dá)服務(wù)器時(shí),需要服務(wù)器返回明確的錯(cuò)誤信息:服務(wù)器太忙了,請(qǐng)稍后重試~。
需求比較簡(jiǎn)單,唯一復(fù)雜的地方在于第四點(diǎn),我們需要對(duì)客戶端的請(qǐng)求量進(jìn)行控制,首先我們需要確認(rèn)的是,我們是無(wú)法控制客戶端發(fā)送的請(qǐng)求數(shù)的,所以我們只能從服務(wù)端進(jìn)行改造,比如從服務(wù)端進(jìn)行限流。
有的同學(xué)可能很快想到,我們應(yīng)該使用 ServerSocket 的 backlog 的屬性,把其設(shè)置成 5,但我們?cè)谏弦徽轮姓f(shuō)到 backlog 并不能準(zhǔn)確代表限制的客戶端連接數(shù),而且我們還要求服務(wù)端返回具體的錯(cuò)誤信息,即使 backlog 生效,也只會(huì)返回固定的錯(cuò)誤信息,不是我們定制的錯(cuò)誤信息。
我們好好想想,線程池似乎可以做這個(gè)事情,我們可以把線程池的 coreSize 和 maxSize 都設(shè)置成 4,把隊(duì)列大小設(shè)置成 1,這樣服務(wù)端每次收到請(qǐng)求后,會(huì)先判斷一下線程池中的隊(duì)列有沒(méi)有數(shù)據(jù),如果有的話,說(shuō)明當(dāng)前服務(wù)器已經(jīng)馬上就要處理第五個(gè)請(qǐng)求了,當(dāng)前請(qǐng)求就是第六個(gè)請(qǐng)求,應(yīng)該被拒絕。
正好線程池的加入也可以滿足第三點(diǎn),服務(wù)端的任務(wù)可以異步執(zhí)行。
2、客戶端代碼
客戶端的代碼比較簡(jiǎn)單,直接向服務(wù)器請(qǐng)求數(shù)據(jù)即可,代碼如下:
public class SocketClient {
private static final Integer SIZE = 1024;
private static final ThreadPoolExecutor socketPoll = new ThreadPoolExecutor(50, 50,
365L,
TimeUnit.DAYS,
new LinkedBlockingQueue<>(400));
@Test
public void test() throws InterruptedException {
// 模擬客戶端同時(shí)向服務(wù)端發(fā)送 6 條消息
for (int i = 0; i < 6; i++) {
socketPoll.submit(() -> {
send("localhost", 7007, "nihao");
});
}
Thread.sleep(1000000000);
}
/**
* 發(fā)送tcp
*
* @param domainName 域名
* @param port 端口
* @param content 發(fā)送內(nèi)容
*/
public static String send(String domainName, int port, String content) {
log.info("客戶端開(kāi)始運(yùn)行");
Socket socket = null;
OutputStream outputStream = null;
InputStreamReader isr = null;
BufferedReader br = null;
InputStream is = null;
StringBuffer response = null;
try {
if (StringUtils.isBlank(domainName)) {
return null;
}
// 無(wú)參構(gòu)造器初始化 Socket,默認(rèn)底層協(xié)議是 TCP
socket = new Socket();
socket.setReuseAddress(true);
// 客戶端準(zhǔn)備連接服務(wù)端,設(shè)置超時(shí)時(shí)間 10 秒
socket.connect(new InetSocketAddress(domainName, port), 10000);
log.info("TCPClient 成功和服務(wù)端建立連接");
// 準(zhǔn)備發(fā)送消息給服務(wù)端
outputStream = socket.getOutputStream();
// 設(shè)置 UTF 編碼,防止亂碼
byte[] bytes = content.getBytes(Charset.forName("UTF-8"));
// 輸出字節(jié)碼
segmentWrite(bytes, outputStream);
// 關(guān)閉輸出
socket.shutdownOutput();
log.info("TCPClient 發(fā)送內(nèi)容為 {}",content);
// 等待服務(wù)端的返回
socket.setSoTimeout(50000);//50秒還沒(méi)有得到數(shù)據(jù),直接斷開(kāi)連接
// 得到服務(wù)端的返回流
is = socket.getInputStream();
isr = new InputStreamReader(is);
br = new BufferedReader(isr);
// 從流中讀取返回值
response = segmentRead(br);
// 關(guān)閉輸入流
socket.shutdownInput();
//關(guān)閉各種流和套接字
close(socket, outputStream, isr, br, is);
log.info("TCPClient 接受到服務(wù)端返回的內(nèi)容為 {}",response);
return response.toString();
} catch (ConnectException e) {
log.error("TCPClient-send socket連接失敗", e);
throw new RuntimeException("socket連接失敗");
} catch (Exception e) {
log.error("TCPClient-send unkown errror", e);
throw new RuntimeException("socket連接失敗");
} finally {
try {
close(socket, outputStream, isr, br, is);
} catch (Exception e) {
// do nothing
}
}
}
/**
* 關(guān)閉各種流
*
* @param socket
* @param outputStream
* @param isr
* @param br
* @param is
* @throws IOException
*/
public static void close(Socket socket, OutputStream outputStream, InputStreamReader isr,
BufferedReader br, InputStream is) throws IOException {
if (null != socket && !socket.isClosed()) {
try {
socket.shutdownOutput();
} catch (Exception e) {
}
try {
socket.shutdownInput();
} catch (Exception e) {
}
try {
socket.close();
} catch (Exception e) {
}
}
if (null != outputStream) {
outputStream.close();
}
if (null != br) {
br.close();
}
if (null != isr) {
isr.close();
}
if (null != is) {
is.close();
}
}
/**
* 分段讀
*
* @param br
* @throws IOException
*/
public static StringBuffer segmentRead(BufferedReader br) throws IOException {
StringBuffer sb = new StringBuffer();
String line;
while ((line = br.readLine()) != null) {
sb.append(line);
}
return sb;
}
/**
* 分段寫(xiě)
*
* @param bytes
* @param outputStream
* @throws IOException
*/
public static void segmentWrite(byte[] bytes, OutputStream outputStream) throws IOException {
int length = bytes.length;
int start, end = 0;
for (int i = 0; end != bytes.length; i++) {
start = i == 0 ? 0 : i * SIZE;
end = length > SIZE ? start + SIZE : bytes.length;
length -= SIZE;
outputStream.write(bytes, start, end - start);
outputStream.flush();
}
}
}客戶端代碼中我們也用到了線程池,主要是為了并發(fā)模擬客戶端一次性發(fā)送 6 個(gè)請(qǐng)求,按照預(yù)期服務(wù)端在處理第六個(gè)請(qǐng)求的時(shí)候,會(huì)返回特定的錯(cuò)誤信息給客戶端。
以上代碼主要方法是 send 方法,主要處理像服務(wù)端發(fā)送數(shù)據(jù),并處理服務(wù)端的響應(yīng)。
3、服務(wù)端代碼
服務(wù)端的邏輯分成兩個(gè)部分,第一部分是控制客戶端的請(qǐng)求個(gè)數(shù),當(dāng)超過(guò)服務(wù)端的能力時(shí),拒絕新的請(qǐng)求,當(dāng)服務(wù)端能力可響應(yīng)時(shí),放入新的請(qǐng)求,第二部分是服務(wù)端任務(wù)的執(zhí)行邏輯。
3.1、對(duì)客戶端請(qǐng)求進(jìn)行控制
public class SocketServiceStart {
/**
* 服務(wù)端的線程池,兩個(gè)作用
* 1:讓服務(wù)端的任務(wù)可以異步執(zhí)行
* 2:管理可同時(shí)處理的服務(wù)端的請(qǐng)求數(shù)
*/
private static final ThreadPoolExecutor collectPoll = new ThreadPoolExecutor(4, 4,
365L,
TimeUnit.DAYS,
new LinkedBlockingQueue<>(
1));
@Test
public void test(){
start();
}
/**
* 啟動(dòng)服務(wù)端
*/
public static final void start() {
log.info("SocketServiceStart 服務(wù)端開(kāi)始啟動(dòng)");
try {
// backlog serviceSocket處理阻塞時(shí),客戶端最大的可創(chuàng)建連接數(shù),超過(guò)客戶端連接不上
// 當(dāng)線程池能力處理滿了之后,我們希望盡量阻塞客戶端的連接
// ServerSocket serverSocket = new ServerSocket(7007,1,null);
// 初始化服務(wù)端
ServerSocket serverSocket = new ServerSocket();
serverSocket.setReuseAddress(true);
// serverSocket.bind(new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), 80));
serverSocket.bind(new InetSocketAddress("localhost", 7007));
log.info("SocketServiceStart 服務(wù)端啟動(dòng)成功");
// 自旋,讓客戶端一直在取客戶端的請(qǐng)求,如果客戶端暫時(shí)沒(méi)有請(qǐng)求,會(huì)一直阻塞
while (true) {
// 接受客戶端的請(qǐng)求
Socket socket = serverSocket.accept();
// 如果隊(duì)列中有數(shù)據(jù)了,說(shuō)明服務(wù)端已經(jīng)到了并發(fā)處理的極限了,此時(shí)需要返回客戶端有意義的信息
if (collectPoll.getQueue().size() >= 1) {
log.info("SocketServiceStart 服務(wù)端處理能力到頂,需要控制客戶端的請(qǐng)求");
//返回處理結(jié)果給客戶端
rejectRequest(socket);
continue;
}
try {
// 異步處理客戶端提交上來(lái)的任務(wù)
collectPoll.submit(new SocketService(socket));
} catch (Exception e) {
socket.close();
}
}
} catch (Exception e) {
log.error("SocketServiceStart - start error", e);
throw new RuntimeException(e);
} catch (Throwable e) {
log.error("SocketServiceStart - start error", e);
throw new RuntimeException(e);
}
}
// 返回特定的錯(cuò)誤碼給客戶端
public static void rejectRequest(Socket socket) throws IOException {
OutputStream outputStream = null;
try{
outputStream = socket.getOutputStream();
byte[] bytes = "服務(wù)器太忙了,請(qǐng)稍后重試~".getBytes(Charset.forName("UTF-8"));
SocketClient.segmentWrite(bytes, outputStream);
socket.shutdownOutput();
}finally {
//關(guān)閉流
SocketClient.close(socket,outputStream,null,null,null);
}
}
}我們使用 collectPoll.getQueue().size() >= 1 來(lái)判斷目前服務(wù)端是否已經(jīng)到達(dá)處理的極限了,如果隊(duì)列中有一個(gè)任務(wù)正在排隊(duì),說(shuō)明當(dāng)前服務(wù)端已經(jīng)超負(fù)荷運(yùn)行了,新的請(qǐng)求應(yīng)該拒絕掉,如果隊(duì)列中沒(méi)有數(shù)據(jù),說(shuō)明服務(wù)端還可以接受新的請(qǐng)求。
以上代碼注釋詳細(xì),就不累贅說(shuō)了。
3.2、服務(wù)端任務(wù)的處理邏輯
服務(wù)端的處理邏輯比較簡(jiǎn)單,主要步驟是:從客戶端的 Socket 中讀取輸入,進(jìn)行處理,把響應(yīng)返回給客戶端。
我們使用線程沉睡 2 秒來(lái)模擬服務(wù)端的處理邏輯,代碼如下:
public class SocketService implements Runnable {
private Socket socket;
public SocketService() {
}
public SocketService(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
log.info("SocketService 服務(wù)端任務(wù)開(kāi)始執(zhí)行");
OutputStream outputStream = null;
InputStream is = null;
InputStreamReader isr = null;
BufferedReader br = null;
try {
//接受消息
socket.setSoTimeout(10000);// 10秒還沒(méi)有得到數(shù)據(jù),直接斷開(kāi)連接
is = socket.getInputStream();
isr = new InputStreamReader(is,"UTF-8");
br = new BufferedReader(isr);
StringBuffer sb = SocketClient.segmentRead(br);
socket.shutdownInput();
log.info("SocketService accept info is {}", sb.toString());
//服務(wù)端處理 模擬服務(wù)端處理耗時(shí)
Thread.sleep(2000);
String response = sb.toString();
//返回處理結(jié)果給客戶端
outputStream = socket.getOutputStream();
byte[] bytes = response.getBytes(Charset.forName("UTF-8"));
SocketClient.segmentWrite(bytes, outputStream);
socket.shutdownOutput();
//關(guān)閉流
SocketClient.close(socket,outputStream,isr,br,is);
log.info("SocketService 服務(wù)端任務(wù)執(zhí)行完成");
} catch (IOException e) {
log.error("SocketService IOException", e);
} catch (Exception e) {
log.error("SocketService Exception", e);
} finally {
try {
SocketClient.close(socket,outputStream,isr,br,is);
} catch (IOException e) {
log.error("SocketService IOException", e);
}
}
}
}4、測(cè)試
測(cè)試的時(shí)候,我們必須先啟動(dòng)服務(wù)端,然后再啟動(dòng)客戶端,首先我們啟動(dòng)服務(wù)端,打印日志如下:

接著我們啟動(dòng)客戶端,打印日志如下:

我們最后看一下服務(wù)端的運(yùn)行日志:

從以上運(yùn)行結(jié)果中,我們可以看出得出的結(jié)果是符合我們預(yù)期的,服務(wù)端在請(qǐng)求高峰時(shí),能夠并發(fā)處理5個(gè)請(qǐng)求,其余請(qǐng)求可以用正確的提示進(jìn)行拒絕。
5、總結(jié)
所以代碼集中在 SocketClient、SocketServiceStart、SocketService 中,啟動(dòng)的順序?yàn)橄葐?dòng) SocketServiceStart,后運(yùn)行 SocketClient,感興趣的同學(xué)可以自己 debug 下,加深印象。
以上就是Socket結(jié)合線程池實(shí)現(xiàn)客戶端和服務(wù)端通信實(shí)戰(zhàn)demo的詳細(xì)內(nèi)容,更多關(guān)于Socket線程池客戶端與服務(wù)端通信demo的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
springboot不掃描@repository的問(wèn)題及解決
這篇文章主要介紹了springboot不掃描@repository的問(wèn)題及解決,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-05-05
Java 動(dòng)態(tài)加載jar和class文件實(shí)例解析
這篇文章主要介紹了Java 動(dòng)態(tài)加載jar和class文件實(shí)例解析,分享了相關(guān)代碼示例,小編覺(jué)得還是挺不錯(cuò)的,具有一定借鑒價(jià)值,需要的朋友可以參考下2018-02-02
Spring?Boot中使用Swagger3.0.0版本構(gòu)建RESTful?APIs的方法
Swagger?是一個(gè)規(guī)范和完整的框架,用于生成、描述、調(diào)用和可視化?RESTful?風(fēng)格的?Web?服務(wù),這篇文章主要介紹了Spring?Boot中使用Swagger3.0.0版本構(gòu)建RESTful?APIs的方法,需要的朋友可以參考下2022-11-11
SpringBoot?2.7.18?集成?Mybatis?Plus?+?Druid的實(shí)例詳解
Mybatis和MybatisPlus都是流行的持久層框架,MybatisPlus在Mybatis基礎(chǔ)上增加了更多便捷的功能,如自動(dòng)CRUD、分頁(yè)插件等,文章還提到了Entity、Mapper、Service、Controller等組件的基本使用方法,為開(kāi)發(fā)者提供了一套完整的集成方案2024-10-10
java正則表達(dá)式匹配網(wǎng)頁(yè)所有網(wǎng)址和鏈接文字的示例
這篇文章主要介紹了java正則表達(dá)式匹配網(wǎng)頁(yè)所有網(wǎng)址和鏈接文字java正則表達(dá)式匹配,需要的朋友可以參考下2014-03-03
通過(guò)第三方接口發(fā)送短信驗(yàn)證碼/短信通知(推薦)
這篇文章主要介紹了通過(guò)第三方接口發(fā)送短信驗(yàn)證碼/短信通知(推薦)的相關(guān)資料,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友可以參考下2016-08-08
springboot集成測(cè)試容器重啟問(wèn)題的處理
這篇文章主要介紹了springboot集成測(cè)試容器重啟問(wèn)題的處理,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11
SpringBoot升級(jí)3.2報(bào)錯(cuò)Invalid value type for
這篇文章給大家介紹了SpringBoot升級(jí)3.2報(bào)錯(cuò)Invalid value type for attribute ‘factoryBeanObjectType‘: java.lang.String的解決方案,文中有詳細(xì)的原因分析,需要的朋友可以參考下2023-12-12

