支持生產(chǎn)阻塞的Java線程池
通常來(lái)說(shuō),生產(chǎn)任務(wù)的速度要大于消費(fèi)的速度。一個(gè)細(xì)節(jié)問(wèn)題是,隊(duì)列長(zhǎng)度,以及如何匹配生產(chǎn)和消費(fèi)的速度。
一個(gè)典型的生產(chǎn)者-消費(fèi)者模型如下:
![]() |
對(duì)于一般的生產(chǎn)快于消費(fèi)的情況。當(dāng)隊(duì)列已滿時(shí),我們并不希望有任何任務(wù)被忽略或得不到執(zhí)行,此時(shí)生產(chǎn)者可以等待片刻再提交任務(wù),更好的做法是,把生產(chǎn)者阻塞在提交任務(wù)的方法上,待隊(duì)列未滿時(shí)繼續(xù)提交任務(wù),這樣就沒(méi)有浪費(fèi)的空轉(zhuǎn)時(shí)間了。阻塞這一點(diǎn)也很容易,BlockingQueue就是為此打造的,ArrayBlockingQueue和LinkedBlockingQueue在構(gòu)造時(shí)都可以提供容量做限制,其中LinkedBlockingQueue是在實(shí)際操作隊(duì)列時(shí)在每次拿到鎖以后判斷容量。
更進(jìn)一步,當(dāng)隊(duì)列為空時(shí),消費(fèi)者拿不到任務(wù),可以等一會(huì)兒再拿,更好的做法是,用BlockingQueue的take方法,阻塞等待,當(dāng)有任務(wù)時(shí)便可以立即獲得執(zhí)行,建議調(diào)用take的帶超時(shí)參數(shù)的重載方法,超時(shí)后線程退出。這樣當(dāng)生產(chǎn)者事實(shí)上已經(jīng)停止生產(chǎn)時(shí),不至于讓消費(fèi)者無(wú)限等待。
于是一個(gè)高效的支持阻塞的生產(chǎn)消費(fèi)模型就實(shí)現(xiàn)了。
等一下,既然J.U.C已經(jīng)幫我們實(shí)現(xiàn)了線程池,為什么還要采用這一套東西?直接用ExecutorService不是更方便?
我們來(lái)看一下ThreadPoolExecutor的基本結(jié)構(gòu):
![]() |
但問(wèn)題在于,即便你在構(gòu)造ThreadPoolExecutor時(shí)手動(dòng)指定了一個(gè)BlockingQueue作為隊(duì)列實(shí)現(xiàn),事實(shí)上當(dāng)隊(duì)列滿時(shí),execute方法并不會(huì)阻塞,原因在于ThreadPoolExecutor調(diào)用的是BlockingQueue非阻塞的offer方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
這時(shí)候就需要做一些事情來(lái)達(dá)成一個(gè)結(jié)果:當(dāng)生產(chǎn)者提交任務(wù),而隊(duì)列已滿時(shí),能夠讓生產(chǎn)者阻塞住,等待任務(wù)被消費(fèi)。
關(guān)鍵在于,在并發(fā)環(huán)境下,隊(duì)列滿不能由生產(chǎn)者去判斷,不能調(diào)用ThreadPoolExecutor.getQueue().size()來(lái)判斷隊(duì)列是否滿。
線程池的實(shí)現(xiàn)中,當(dāng)隊(duì)列滿時(shí)會(huì)調(diào)用構(gòu)造時(shí)傳入的RejectedExecutionHandler去拒絕任務(wù)的處理。默認(rèn)的實(shí)現(xiàn)是AbortPolicy,直接拋出一個(gè)RejectedExecutionException。
幾種拒絕策略在這里就不贅述了,這里和我們的需求比較接近的是CallerRunsPolicy,這種策略會(huì)在隊(duì)列滿時(shí),讓提交任務(wù)的線程去執(zhí)行任務(wù),相當(dāng)于讓生產(chǎn)者臨時(shí)去干了消費(fèi)者干的活兒,這樣生產(chǎn)者雖然沒(méi)有被阻塞,但提交任務(wù)也會(huì)被暫停。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a <tt>CallerRunsPolicy</tt>.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
但這種策略也有隱患,當(dāng)生產(chǎn)者較少時(shí),生產(chǎn)者消費(fèi)任務(wù)的時(shí)間里,消費(fèi)者可能已經(jīng)把任務(wù)都消費(fèi)完了,隊(duì)列處于空狀態(tài),當(dāng)生產(chǎn)者執(zhí)行完任務(wù)后才能再繼續(xù)生產(chǎn)任務(wù),這個(gè)過(guò)程中可能導(dǎo)致消費(fèi)者線程的饑餓。
參考類似的思路,最簡(jiǎn)單的做法,我們可以直接定義一個(gè)RejectedExecutionHandler,當(dāng)隊(duì)列滿時(shí)改為調(diào)用BlockingQueue.put來(lái)實(shí)現(xiàn)生產(chǎn)者的阻塞:
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
// should not be interrupted
}
}
}
};
這樣,我們就無(wú)需再關(guān)心Queue和Consumer的邏輯,只要把精力集中在生產(chǎn)者和消費(fèi)者線程的實(shí)現(xiàn)邏輯上,只管往線程池提交任務(wù)就行了。
相比最初的設(shè)計(jì),這種方式的代碼量能減少不少,而且能避免并發(fā)環(huán)境的很多問(wèn)題。當(dāng)然,你也可以采用另外的手段,例如在提交時(shí)采用信號(hào)量做入口限制等,但是如果僅僅是要讓生產(chǎn)者阻塞,那就顯得復(fù)雜了。
- Java線程池用法實(shí)戰(zhàn)案例分析
- Java線程池運(yùn)行狀態(tài)監(jiān)控實(shí)現(xiàn)解析
- Java線程池的拒絕策略實(shí)現(xiàn)詳解
- Java中Future、FutureTask原理以及與線程池的搭配使用
- 到底如何設(shè)置Java線程池的大小的方法示例
- java線程池實(shí)現(xiàn)批量下載文件
- Java8并行流中自定義線程池操作示例
- Java ThreadPoolExecutor 線程池的使用介紹
- JAVA線程池原理實(shí)例詳解
- Java線程池的幾種實(shí)現(xiàn)方法及常見(jiàn)問(wèn)題解答
- Java線程池的應(yīng)用實(shí)例分析
相關(guān)文章
Spring啟動(dòng)過(guò)程中實(shí)例化部分代碼的分析之Bean的推斷構(gòu)造方法
這篇文章主要介紹了Spring啟動(dòng)過(guò)程中實(shí)例化部分代碼的分析之Bean的推斷構(gòu)造方法,實(shí)例化這一步便是在doCreateBean方法的?instanceWrapper?=?createBeanInstance(beanName,?mbd,?args);這段代碼中,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友參考下吧2022-09-09
Java編程之多線程死鎖與線程間通信簡(jiǎn)單實(shí)現(xiàn)代碼
這篇文章主要介紹了Java編程之多線程死鎖與線程間通信簡(jiǎn)單實(shí)現(xiàn)代碼,具有一定參考價(jià)值,需要的朋友可以了解下。2017-10-10
淺析Java中Map與HashMap,Hashtable,HashSet的區(qū)別
HashMap和Hashtable兩個(gè)類都實(shí)現(xiàn)了Map接口,二者保存K-V對(duì)(key-value對(duì));HashSet則實(shí)現(xiàn)了Set接口,性質(zhì)類似于集合2013-09-09
SpringBoot使用ApplicationEvent&Listener完成業(yè)務(wù)解耦
這篇文章主要介紹了SpringBoot使用ApplicationEvent&Listener完成業(yè)務(wù)解耦示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-05-05
SpringBoot集成pf4j實(shí)現(xiàn)插件開(kāi)發(fā)功能的代碼示例
pf4j是一個(gè)插件框架,用于實(shí)現(xiàn)插件的動(dòng)態(tài)加載,支持的插件格式(zip、jar),本文給大家介紹了SpringBoot集成pf4j實(shí)現(xiàn)插件開(kāi)發(fā)功能的示例,文中通過(guò)代碼示例給大家講解的非常詳細(xì),需要的朋友可以參考下2024-07-07
java request.getParameter中文亂碼解決方法
今天跟大家分享幾個(gè)解決java Web開(kāi)發(fā)中,request.getParameter()獲取URL中文參數(shù)亂碼的解決辦法,需要的朋友可以參考下2020-02-02
解析MapStruct轉(zhuǎn)換javaBean時(shí)出現(xiàn)的詭異事件
在項(xiàng)目中用到了MapStruct,對(duì)其可以轉(zhuǎn)換JavaBean特別好奇,今天小編給大家分享一個(gè)demo給大家講解MapStruct轉(zhuǎn)換javaBean時(shí)出現(xiàn)的詭異事件,感興趣的朋友一起看看吧2021-09-09
不規(guī)范使用ThreadLocal導(dǎo)致bug分析解決
這篇文章主要為大家介紹了不規(guī)范使用ThreadLocal導(dǎo)致bug分析解決,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-01-01



