Java線程池Executor用法詳解
線程池類圖

我們最常使用的Executors實(shí)現(xiàn)創(chuàng)建線程池使用線程主要是用上述類圖中提供的類。在上邊的類圖中,包含了一個(gè)Executor框架,它是一個(gè)根據(jù)一組執(zhí)行策略的調(diào)用調(diào)度執(zhí)行和控制異步任務(wù)的框架,目的是提供一種將任務(wù)提交與任務(wù)如何運(yùn)行分離開(kāi)的機(jī)制。它包含了三個(gè)executor接口:
- Executor:運(yùn)行新任務(wù)的簡(jiǎn)單接口
- ExecutorService:擴(kuò)展了Executor,添加了用來(lái)管理執(zhí)行器生命周期和任務(wù)生命周期的方法
- ScheduleExcutorService:擴(kuò)展了ExecutorService,支持Future和定期執(zhí)行任務(wù)
線程池的好處
- 降低資源消耗-重用存在的線程,減少對(duì)象創(chuàng)建、消亡的開(kāi)銷,性能好
- 提高響應(yīng)速度 -可有效控制最大并發(fā)線程數(shù),提高系統(tǒng)資源利用率,同時(shí)可以避免過(guò)多資源競(jìng)爭(zhēng),避免阻塞。當(dāng)任務(wù)到達(dá)時(shí),任務(wù)可不用等待線程創(chuàng)建就能立即執(zhí)行
- 提高線程的可管理性-提供定時(shí)執(zhí)行、定期執(zhí)行、單線程、并發(fā)數(shù)控制等功能。
new Thread的弊端
- 每次new Thread 新建對(duì)象,性能差
- 線程缺乏統(tǒng)一管理,可能無(wú)限制的新建線程,相互競(jìng)爭(zhēng),可能占用過(guò)多的系統(tǒng)資源導(dǎo)致死機(jī)或者OOM(out of memory 內(nèi)存溢出),這種問(wèn)題的原因不是因?yàn)閱渭兊膎ew一個(gè)Thread,而是可能因?yàn)槌绦虻腷ug或者設(shè)計(jì)上的缺陷導(dǎo)致不斷new Thread造成的。
- 缺少更多功能,如更多執(zhí)行、定期執(zhí)行、線程中斷。
線程池核心類-ThreadPoolExecutor
參數(shù)說(shuō)明:ThreadPoolExecutor一共有七個(gè)參數(shù),這七個(gè)參數(shù)配合起來(lái),構(gòu)成了線程池強(qiáng)大的功能。
corePoolSize:核心線程數(shù)量
maximumPoolSize:線程最大線程數(shù)
workQueue:阻塞隊(duì)列,存儲(chǔ)等待執(zhí)行的任務(wù),很重要,會(huì)對(duì)線程池運(yùn)行過(guò)程產(chǎn)生重大影響
當(dāng)我們提交一個(gè)新的任務(wù)到線程池,線程池會(huì)根據(jù)當(dāng)前池中正在運(yùn)行的線程數(shù)量來(lái)決定該任務(wù)的處理方式。處理方式有三種:
1、直接切換(SynchronusQueue)
2、無(wú)界隊(duì)列(LinkedBlockingQueue)能夠創(chuàng)建的最大線程數(shù)為corePoolSize,這時(shí)maximumPoolSize就不會(huì)起作用了。當(dāng)線程池中所有的核心線程都是運(yùn)行狀態(tài)的時(shí)候,新的任務(wù)提交就會(huì)放入等待隊(duì)列中。
3、有界隊(duì)列(ArrayBlockingQueue)最大maximumPoolSize,能夠降低資源消耗,但是這種方式使得線程池對(duì)線程調(diào)度變的更困難。因?yàn)榫€程池與隊(duì)列容量都是有限的。所以想讓線程池的吞吐率和處理任務(wù)達(dá)到一個(gè)合理的范圍,又想使我們的線程調(diào)度相對(duì)簡(jiǎn)單,并且還盡可能降低資源的消耗,我們就需要合理的限制這兩個(gè)數(shù)量 分配技巧: [如果想降低資源的消耗包括降低cpu使用率、操作系統(tǒng)資源的消耗、上下文切換的開(kāi)銷等等,可以設(shè)置一個(gè)較大的隊(duì)列容量和較小的線程池容量,這樣會(huì)降低線程池的吞吐量。如果我們提交的任務(wù)經(jīng)常發(fā)生阻塞,我們可以調(diào)整maximumPoolSize。如果我們的隊(duì)列容量較小,我們需要把線程池大小設(shè)置的大一些,這樣cpu的使用率相對(duì)來(lái)說(shuō)會(huì)高一些。但是如果線程池的容量設(shè)置的過(guò)大,提高任務(wù)的數(shù)量過(guò)多的時(shí)候,并發(fā)量會(huì)增加,那么線程之間的調(diào)度就是一個(gè)需要考慮的問(wèn)題。這樣反而可能會(huì)降低處理任務(wù)的吞吐量。]
keepAliveTime:線程沒(méi)有任務(wù)執(zhí)行時(shí)最多保持多久時(shí)間終止(當(dāng)線程中的線程數(shù)量大于corePoolSize的時(shí)候,如果這時(shí)沒(méi)有新的任務(wù)提交核心線程外的線程不會(huì)立即銷毀,而是等待,直到超過(guò)keepAliveTime)
unit:keepAliveTime的時(shí)間單位
threadFactory:線程工廠,用來(lái)創(chuàng)建線程,有一個(gè)默認(rèn)的工場(chǎng)來(lái)創(chuàng)建線程,這樣新創(chuàng)建出來(lái)的線程有相同的優(yōu)先級(jí),是非守護(hù)線程、設(shè)置好了名稱)
rejectHandler:當(dāng)拒絕處理任務(wù)時(shí)(阻塞隊(duì)列滿)的策略(AbortPolicy默認(rèn)策略直接拋出異常、CallerRunsPolicy用調(diào)用者所在的線程執(zhí)行任務(wù)、DiscardOldestPolicy丟棄隊(duì)列中最靠前的任務(wù)并執(zhí)行當(dāng)前任務(wù)、DiscardPolicy直接丟棄當(dāng)前任務(wù))

corePoolSize、maximumPoolSize、workQueue 三者關(guān)系:如果運(yùn)行的線程數(shù)小于corePoolSize的時(shí)候,直接創(chuàng)建新線程來(lái)處理任務(wù)。即使線程池中的其他線程是空閑的。如果運(yùn)行中的線程數(shù)大于corePoolSize且小于maximumPoolSize時(shí),那么只有當(dāng)workQueue滿的時(shí)候才創(chuàng)建新的線程去處理任務(wù)。如果corePoolSize與maximumPoolSize是相同的,那么創(chuàng)建的線程池大小是固定的。這時(shí)有新任務(wù)提交,當(dāng)workQueue未滿時(shí),就把請(qǐng)求放入workQueue中。等待空線程從workQueue取出任務(wù)。如果workQueue此時(shí)也滿了,那么就使用另外的拒絕策略參數(shù)去執(zhí)行拒絕策略。
初始化方法:由七個(gè)參數(shù)組合成四個(gè)初始化方法

其他方法:
execute(); //提交任務(wù),交給線程池執(zhí)行 submit();//提交任務(wù),能夠返回執(zhí)行結(jié)果 execute+Future shutdown();//關(guān)閉線程池,等待任務(wù)都執(zhí)行完 shutdownNow();//關(guān)閉線程池,不等待任務(wù)執(zhí)行完 getTaskCount();//線程池已執(zhí)行和未執(zhí)行的任務(wù)總數(shù) getCompleteTaskCount();//已完成的任務(wù)數(shù)量 getPoolSize();//線程池當(dāng)前的線程數(shù)量 getActiveCount();//當(dāng)前線程池中正在執(zhí)行任務(wù)的線程數(shù)量
線程池生命周期:

- running:能接受新提交的任務(wù),也能處理阻塞隊(duì)列中的任務(wù)
- shutdown:不能處理新的任務(wù),但是能繼續(xù)處理阻塞隊(duì)列中任務(wù)
- stop:不能接收新的任務(wù),也不處理隊(duì)列中的任務(wù)
- tidying:如果所有的任務(wù)都已經(jīng)終止了,這時(shí)有效線程數(shù)為0
- terminated:最終狀態(tài)
使用Executors創(chuàng)建線程池
使用Executors可以創(chuàng)建四種線程池:分別對(duì)應(yīng)上邊提到的四種線程池初始化方法
Executors.newCachedThreadPool
newCachedThreadPool是一個(gè)根據(jù)需要?jiǎng)?chuàng)建新線程的線程池,當(dāng)一個(gè)任務(wù)提交時(shí),corePoolSize為0不創(chuàng)建核心線程,SynchronousQueue是一個(gè)不存儲(chǔ)元素的隊(duì)列,可以理解為隊(duì)里永遠(yuǎn)是滿的,因此最終會(huì)創(chuàng)建非核心線程來(lái)執(zhí)行任務(wù)。 對(duì)于非核心線程空閑60s時(shí)將被回收。因?yàn)镮nteger.MAX_VALUE非常大,可以認(rèn)為是可以無(wú)限創(chuàng)建線程的,在資源有限的情況下容易引起OOM異常。
//創(chuàng)建newCachedThreadPool線程池源碼
public static ExecutorService newCachedThreadPool() {
/**
*corePoolSize: 0,核心線程池的數(shù)量為0
*maximumPoolSize: Integer.MAX_VALUE,可以認(rèn)為最大線程數(shù)是無(wú)限的
*keepAliveTime: 60L
*unit: 秒
*workQueue: SynchronousQueue
**/
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
使用案例:
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int index = i;
executor.execute(new Runnable() {
@Override
public void run() {
log.info("task:{}",index);
}
});
}
}值得注意的一點(diǎn)是,newCachedThreadPool的返回值是ExecutorService類型,該類型只包含基礎(chǔ)的線程池方法,但卻不包含線程監(jiān)控相關(guān)方法,因此在使用返回值為ExecutorService的線程池類型創(chuàng)建新線程時(shí)要考慮到具體情況。

Executors.newSingleThreadExecutor
newSingleThreadExecutor是單線程線程池,只有一個(gè)核心線程,用唯一的一個(gè)共用線程執(zhí)行任務(wù),保證所有任務(wù)按指定順序執(zhí)行(FIFO、優(yōu)先級(jí)…)
//newSingleThreadExecutor創(chuàng)建線程池源碼
public static ExecutorService newSingleThreadExecutor() {
/**
* corePoolSize : 1,核心線程池的數(shù)量為1
* maximumPoolSize : 1,只可以創(chuàng)建一個(gè)非核心線程
* keepAliveTime : 0L
* unit => 秒
* workQueue => LinkedBlockingQueue
**/
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}當(dāng)一個(gè)任務(wù)提交時(shí),首先會(huì)創(chuàng)建一個(gè)核心線程來(lái)執(zhí)行任務(wù),如果超過(guò)核心線程的數(shù)量,將會(huì)放入隊(duì)列中,因?yàn)長(zhǎng)inkedBlockingQueue是長(zhǎng)度為Integer.MAX_VALUE的隊(duì)列,可以認(rèn)為是無(wú)界隊(duì)列,因此往隊(duì)列中可以插入無(wú)限多的任務(wù),在資源有限的時(shí)候容易引起OOM異常,同時(shí)因?yàn)闊o(wú)界隊(duì)列,maximumPoolSize和keepAliveTime參數(shù)將無(wú)效,壓根就不會(huì)創(chuàng)建非核心線程。
Executors.newFixedThreadPool
定長(zhǎng)線程池,核心線程數(shù)和最大線程數(shù)由用戶傳入,可以設(shè)置線程的最大并發(fā)數(shù),超出在隊(duì)列等待
//newFixedThreadPool創(chuàng)建線程池源碼
public static ExecutorService newFixedThreadPool(int nThreads) {
/**
* corePoolSize : 核心線程的數(shù)量為自定義輸入nThreads
* maximumPoolSize : 最大線程的數(shù)量為自定義輸入nThreads
* keepAliveTime : 0L
* unit : 秒
* workQueue : LinkedBlockingQueue
**/
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}newFixedThreadPool和SingleThreadExecutor類似,唯一的區(qū)別就是核心線程數(shù)不同,并且由于使用的是LinkedBlockingQueue,在資源有限的時(shí)候容易引起OOM異常。
Executors.newScheduledThreadPool
定長(zhǎng)線程池,核心線程數(shù)由用戶傳入,支持定時(shí)和周期任務(wù)執(zhí)行
//newScheduledThreadPool創(chuàng)建線程池源碼
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
/**
* corePoolSize : 核心線程的數(shù)量為自定義輸入corePoolSize
* maximumPoolSize : 最大線程的數(shù)量為Integer.MAX_VALUE
* keepAliveTime : 0L
* unit : 納秒
* workQueue : DelayedWorkQueue
**/
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}當(dāng)一個(gè)任務(wù)提交時(shí),corePoolSize為自定義輸入,首先創(chuàng)建核心線程,核心線程滿了之后,因此最終會(huì)創(chuàng)建非核心線程來(lái)執(zhí)行任務(wù)。非核心線程使用后將被回收。因?yàn)镮nteger.MAX_VALUE非常大,可以認(rèn)為是可以無(wú)限創(chuàng)建線程的,在資源有限的情況下容易引起OOM異常。因?yàn)槭褂玫腄elayedWorkQueue可以實(shí)現(xiàn)定時(shí)和周期任務(wù)。 ScheduledExecutorService提供了三種方法可以使用:

schedule:延遲后執(zhí)行任務(wù) scheduleAtFixedRate:以指定的速率執(zhí)行任務(wù) scheduleWithFixedDelay:以指定的延遲執(zhí)行任務(wù) 使用案例:
public static void main(String[] args) {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
// executorService.schedule(new Runnable() {
// @Override
// public void run() {
// log.warn("schedule run");
// }
// //延遲3秒后執(zhí)行
// }, 3, TimeUnit.SECONDS);
// executorService.shutdown();
// executorService.scheduleWithFixedDelay(new Runnable() {
// @Override
// public void run() {
// log.warn("scheduleWithFixedDelay run");
// }
// //延遲一秒后每隔3秒執(zhí)行
// }, 1, 3, TimeUnit.SECONDS);
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
log.warn("schedule run");
}
//延遲一秒后每隔3秒執(zhí)行
}, 1, 3, TimeUnit.SECONDS);
/**
* 定時(shí)器調(diào)度,不推薦使用,推薦ScheduledExecutorService調(diào)度
*/
// Timer timer = new Timer();
// timer.schedule(new TimerTask() {
// @Override
// public void run() {
// log.warn("timer run");
// }
// //從當(dāng)前時(shí)間每隔5秒執(zhí)行
// }, new Date(), 5 * 1000);
}總結(jié)
- FixedThreadPool和SingleThreadExecutor 允許的請(qǐng)求隊(duì)列長(zhǎng)度為Integer.MAX_VALUE,可能會(huì)堆積大量的請(qǐng)求,從而引起OOM異常
- CachedThreadPool 和newScheduledThreadPool允許創(chuàng)建的線程數(shù)為Integer.MAX_VALUE,可能會(huì)創(chuàng)建大量的線程,從而引起OOM異常
這就是為什么禁止使用Executors去創(chuàng)建線程池,而是推薦自己去創(chuàng)建ThreadPoolExecutor的原因
如何定義線程池參數(shù)
CPU密集型 : 線程池的大小推薦為CPU數(shù)量 + 1,CPU數(shù)量可以根據(jù)Runtime.availableProcessors方法獲取 IO密集型 : CPU數(shù)量 * CPU利用率 * (1 + 線程等待時(shí)間/線程CPU時(shí)間) 混合型 : 將任務(wù)分為CPU密集型和IO密集型,然后分別使用不同的線程池去處理,從而使每個(gè)線程池可以根據(jù)各自的工作負(fù)載來(lái)調(diào)整 阻塞隊(duì)列 : 推薦使用有界隊(duì)列,有界隊(duì)列有助于避免資源耗盡的情況發(fā)生 拒絕策略 : 默認(rèn)采用的是AbortPolicy拒絕策略,直接在程序中拋出RejectedExecutionException異?!疽?yàn)槭沁\(yùn)行時(shí)異常,不強(qiáng)制catch】,這種處理方式不夠優(yōu)雅。處理拒絕策略有以下幾種比較推薦:
- 在程序中捕獲RejectedExecutionException異常,在捕獲異常中對(duì)任務(wù)進(jìn)行處理。針對(duì)默認(rèn)拒絕策略
- 使用CallerRunsPolicy拒絕策略,該策略會(huì)將任務(wù)交給調(diào)用execute的線程執(zhí)行【一般為主線程】,此時(shí)主線程將在一段時(shí)間內(nèi)不能提交任何任務(wù),從而使工作線程處理正在執(zhí)行的任務(wù)。此時(shí)提交的線程將被保存在TCP隊(duì)列中,TCP隊(duì)列滿將會(huì)影響客戶端,這是一種平緩的性能降低
- 自定義拒絕策略,只需要實(shí)現(xiàn)RejectedExecutionHandler接口即可
- 如果任務(wù)不是特別重要,使用DiscardPolicy和DiscardOldestPolicy拒絕策略將任務(wù)丟棄也是可以的
如果使用Executors的靜態(tài)方法創(chuàng)建ThreadPoolExecutor對(duì)象,可以通過(guò)使用Semaphore對(duì)任務(wù)的執(zhí)行進(jìn)行限流也可以避免出現(xiàn)OOM異常
到此這篇關(guān)于Java線程池Executor用法詳解的文章就介紹到這了,更多相關(guān)Java線程池Executor內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
解析Java的Jackson庫(kù)中對(duì)象的序列化與數(shù)據(jù)泛型綁定
這篇文章主要介紹了解析Java的Jackson庫(kù)中對(duì)象的序列化與數(shù)據(jù)泛型綁定,Jackson通常被用來(lái)實(shí)現(xiàn)Java對(duì)象和JSON數(shù)據(jù)的相互轉(zhuǎn)換功能,需要的朋友可以參考下2016-01-01
Spring Boot實(shí)現(xiàn)對(duì)文件進(jìn)行壓縮下載功能
在Web應(yīng)用中,文件下載功能是一個(gè)常見(jiàn)的需求,特別是當(dāng)你需要提供用戶下載各種類型的文件時(shí),本文將演示如何使用Spring Boot框架來(lái)實(shí)現(xiàn)一個(gè)簡(jiǎn)單而強(qiáng)大的文件下載功能,需要的朋友跟隨小編一起學(xué)習(xí)吧2023-09-09
使用Java通過(guò)OAuth協(xié)議驗(yàn)證發(fā)送微博的教程
這篇文章主要介紹了使用Java通過(guò)OAuth協(xié)議驗(yàn)證發(fā)送微博的教程,使用到了新浪微博為Java開(kāi)放的API weibo4j,需要的朋友可以參考下2016-02-02
java中初始化MediaRecorder的實(shí)現(xiàn)方法
這篇文章主要介紹了java中初始化MediaRecorder的實(shí)現(xiàn)方法的相關(guān)資料,希望通過(guò)本文能幫助到大家,讓大家實(shí)現(xiàn)這樣的功能,需要的朋友可以參考下2017-10-10
druid?handleException執(zhí)行流程源碼解析
這篇文章主要為大家介紹了druid?handleException執(zhí)行流程源碼解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-09-09
使用java實(shí)現(xiàn)手機(jī)短信驗(yàn)證全過(guò)程
這篇文章主要介紹了使用java實(shí)現(xiàn)手機(jī)短信驗(yàn)證全過(guò)程,文中有非常詳細(xì)的代碼示例,對(duì)正在學(xué)習(xí)java的小伙伴們有非常好的幫助,需要的朋友可以參考下2021-04-04
java實(shí)現(xiàn)身份證號(hào)碼驗(yàn)證的示例代碼
這篇文章主要為大家詳細(xì)介紹了如何利用java語(yǔ)言實(shí)現(xiàn)身份證號(hào)碼驗(yàn)證的功能,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2023-09-09

