線程池ThreadPoolExecutor應(yīng)用過(guò)程
一個(gè)老生常談的話題:線程的創(chuàng)建及銷毀是非常消耗時(shí)間及資源的,所以線程應(yīng)該交由線程池去執(zhí)行。
ThreadPoolExecutor構(gòu)造說(shuō)明及常用方法
ThreadPoolExecutor提供了很多構(gòu)造方法,這里主要說(shuō)以下這個(gè),其他構(gòu)造方法都是基于此方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)- 參數(shù)int corePoolSize:核心線程池大小,不能小于0;
- 參數(shù)int maximumPoolSize:最大線程池大小(核心線程數(shù)+臨時(shí)線程數(shù)),不能小于等于0且不能小于corePoolSize參數(shù),線程池的大小會(huì)在corePoolSize與maximumPoolSize之間動(dòng)態(tài)變化;
- 參數(shù)long keepAliveTime:臨時(shí)線程(非核心線程,核心線程空閑之后不會(huì)被銷毀)最大空閑時(shí)間,不能小于0;
- 參數(shù)TimeUnit unit:keepAliveTime參數(shù)的時(shí)間單位;
- 參數(shù)BlockingQueue<Runnable> workQueue:線程等待隊(duì)列;
- 參數(shù)ThreadFactory threadFactory:線程創(chuàng)建工廠;
- 參數(shù)RejectedExecutionHandler handler:拒絕策略;
下面是構(gòu)造ThreadPoolExecutor的參數(shù)校驗(yàn):

其他常用方法:
- int prestartAllCoreThreads():根據(jù)線程核心數(shù),啟動(dòng)所有的核心線程,讓它們?nèi)刻幱诳臻e狀態(tài)等待工作
- boolean prestartCoreThread():隨機(jī)啟動(dòng)一個(gè)核心線程,當(dāng)所有核心線程都已經(jīng)啟動(dòng)的時(shí)候,返回false(表示未啟動(dòng)到有效資源);
- void allowCoreThreadTimeOut(boolean value):設(shè)置是否允許臨時(shí)線程永久空閑存活,設(shè)置true后,keepAliveTime參數(shù)失效;
- boolean allowsCoreThreadTimeOut():獲取是否允許臨時(shí)線程永久空閑存活;
- void shutdown():銷毀線程池,此前還存在于線程池的任務(wù)依然會(huì)被執(zhí)行;
- List<Runnable> shutdownNow():銷毀線程池,與shutdown()的區(qū)別是,未執(zhí)行完成的線程會(huì)被標(biāo)記為中斷;
參數(shù)舉例說(shuō)明:
1、當(dāng)創(chuàng)建一個(gè)任務(wù)并交給線程池執(zhí)行執(zhí)行后,會(huì)從消耗一個(gè)核心線程資源,當(dāng)該任務(wù)執(zhí)行完畢之后,會(huì)變成一個(gè)空閑線程,另外,如果核心線程數(shù)足以支撐任務(wù)運(yùn)行,即使有核心空閑線程,依然會(huì)創(chuàng)建一個(gè)新的核心線程資源;
2、當(dāng)加入線程池的任務(wù)超過(guò)核心線程數(shù),會(huì)進(jìn)入workQueue中等待執(zhí)行;
3、當(dāng)加入線程池的任務(wù)超過(guò)核心線程數(shù),溢出的任務(wù)數(shù)如果超過(guò)等待隊(duì)列長(zhǎng)度,會(huì)直接創(chuàng)建大于corePoolSize且小于maximumPoolSize的線程資源用于執(zhí)行任務(wù),超過(guò)核心線程數(shù)的部分稱為臨時(shí)線程;
4、keepAliveTime:當(dāng)線程池被擴(kuò)大到corePoolSize與maximumPoolSize之間之后,當(dāng)執(zhí)行完所有任務(wù)后,不再有新的任務(wù)進(jìn)來(lái),超出corePoolSize的部分會(huì)作為空閑線程,會(huì)在keepAliveTime指定時(shí)間后,線程池大小縮小為corePoolSize,如果設(shè)置allowCoreThreadTimeOut(true),則空閑線程一直不會(huì)銷毀;
5、BlockingQueue<Runnable> workQueue:用于保存等待執(zhí)行的任務(wù)的阻塞隊(duì)列接口,java提供了以下實(shí)現(xiàn)類:

線程池中,BlockingQueue是典型的"生產(chǎn)者消費(fèi)者"模型:生產(chǎn)者調(diào)用插入元素的方法(add/offer/put等方法),消費(fèi)者調(diào)用移除元素的方法(remove/poll/take等方法),其中,生產(chǎn)者插入和消費(fèi)者移除都有阻塞和非阻塞的方法,offer/poll是非阻塞方法,put/take是阻塞方法(隊(duì)列后面單獨(dú)再寫);
ThreadPoolExecutor提交任務(wù)采用的是不阻塞的offer方法,從隊(duì)列中獲取任務(wù)采用的是阻塞的take方法,正是因?yàn)門hreadPoolExecutor使用了不阻塞的offer方法,所以當(dāng)隊(duì)列容量已滿,線程池會(huì)去創(chuàng)建新的臨時(shí)線程,去處理隊(duì)列中溢出的任務(wù)。
- ArrayBlockingQueue:是一個(gè)基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列,此隊(duì)列按 FIFO(先進(jìn)先出)原則對(duì)元素進(jìn)行排序,允許迭代中修改隊(duì)列(刪除元素時(shí)會(huì)更新迭代器),當(dāng)然,一般也不會(huì)有場(chǎng)景需要迭代阻塞隊(duì)列;
- LinkedBlockingQueue:一個(gè)基于鏈表結(jié)構(gòu)的默認(rèn)無(wú)界雙端阻塞隊(duì)列,雙端意味著可以像普通隊(duì)列一樣FIFO(先進(jìn)先出),可以以像棧一樣FILO(先進(jìn)后出)。吞吐量通常要高于ArrayBlockingQueue(如無(wú)需在迭代中修改元素,優(yōu)于ArrayBlockingQueue,同時(shí)記得指定隊(duì)列長(zhǎng)度,提供了一個(gè)可選有界的構(gòu)造函數(shù),而在未指明容量時(shí),容量默認(rèn)為Integer.MAX_VALUE);
- SynchronousQueue:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列,所謂不存儲(chǔ)元素,正如名字描述,同步隊(duì)列,每個(gè)插入操作必須等到另一個(gè)線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),即生產(chǎn)一個(gè),就消費(fèi)一個(gè),因此它也被認(rèn)為是無(wú)界的隊(duì)列,吞吐量通常要高于LinkedBlockingQueue。同時(shí),SynchronousQueue不能遍歷,因?yàn)樗鼪]有元素可以遍歷;
- DelayQueue:使用二叉堆實(shí)現(xiàn)的優(yōu)先級(jí)阻塞隊(duì)列,通過(guò)執(zhí)行時(shí)延從隊(duì)列中提取任務(wù),時(shí)間沒到任務(wù)取不出來(lái),可用于實(shí)現(xiàn)有延時(shí)場(chǎng)景的需求;
- PriorityBlockingQueue:使用二叉堆實(shí)現(xiàn)的優(yōu)先級(jí)阻塞隊(duì)列,可以實(shí)現(xiàn)Comparable接口也可以提供Comparator來(lái)對(duì)隊(duì)列中的元素進(jìn)行比較,跟時(shí)間沒有任何關(guān)系,僅僅是按照優(yōu)先級(jí)取任務(wù),如果有根據(jù)某個(gè)規(guī)則優(yōu)先取出任務(wù)的場(chǎng)景,適用;
綜上,需要根據(jù)每個(gè)隊(duì)列的特點(diǎn)選擇適合自己場(chǎng)景的隊(duì)列。
6、RejectedExecutionHandler handler:當(dāng)隊(duì)列和線程池都滿了(從上面可以看出,一個(gè)線程池可以容納的的最大任務(wù)數(shù)是maximumPoolSize+隊(duì)列長(zhǎng)度,因此使用無(wú)界隊(duì)列會(huì)造成隊(duì)列永不會(huì)滿的情況,一旦控制不好就容易出現(xiàn)OOM),說(shuō)明線程池處于飽和狀態(tài),那么必須采取一種策略處理提交的新任務(wù)。
1、AbortPolicy:直接拋出異常;
2、CallerRunsPolicy:只用調(diào)用者所在線程來(lái)運(yùn)行任務(wù);
3、DiscardOldestPolicy:丟棄隊(duì)列里最近的一個(gè)任務(wù),并執(zhí)行當(dāng)前任務(wù);
4、DiscardPolicy:不處理,丟棄掉;
5、也可以根據(jù)應(yīng)用場(chǎng)景需要來(lái)實(shí)現(xiàn)RejectedExecutionHandler接口自定義策略。如記錄日志或持久化不能處理的任務(wù);
注:AbortPolicy/CallerRunsPolicy/DiscardOldestPolicy/DiscardPolicy都是ThreadPoolExecutor的內(nèi)部類,如下:

所以,實(shí)例化需要用內(nèi)部類的方式,如下:
AbortPolicy abortPolicy = new ThreadPoolExecutor.AbortPolicy();
自定義RejectedExecutionHandler拒絕策略示例:
public class RejectedExecutionHandlerTest implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 啥邏輯都沒有,表示忽略,比如DiscardPolicy
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
// TODO 記錄錯(cuò)誤日志等
e.printStackTrace();
}
}
}ThreadPoolExecutor的特點(diǎn)總結(jié)起來(lái)就是以下4點(diǎn):
1、當(dāng)有任務(wù)提交的時(shí)候,會(huì)創(chuàng)建核心線程去執(zhí)行任務(wù)(即使有核心線程空閑);
2、當(dāng)核心線程數(shù)達(dá)到corePoolSize時(shí),后續(xù)提交的都會(huì)進(jìn)BlockingQueue中排隊(duì);
3、當(dāng)BlockingQueue滿了(offer失敗),就會(huì)創(chuàng)建臨時(shí)線程(臨時(shí)線程空閑超過(guò)一定時(shí)間后,會(huì)被銷毀),其中臨時(shí)線程最大數(shù)量=maximumPoolSize - corePoolSize,空閑最大時(shí)間由keepAliveTime控制,如果設(shè)置allowCoreThreadTimeOut(true),臨時(shí)線程永不銷毀;
4、當(dāng)線程總數(shù)達(dá)到maximumPoolSize 時(shí),后續(xù)提交的任務(wù)都會(huì)被RejectedExecutionHandler拒絕。
為什么強(qiáng)制要求使用ThreadPoolExecutor創(chuàng)建線程池
在Executor類中,提供了以下場(chǎng)景的創(chuàng)建線程池的方法:
1、newCachedThreadPool,特點(diǎn):必要時(shí)創(chuàng)建新線程,空閑線程會(huì)保留60s;
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}根據(jù)上面的ThreadPoolExecutor,允許的創(chuàng)建線程數(shù)量為 Integer.MAX_VALUE,可能會(huì)創(chuàng)建大量的線程,從而導(dǎo)致 OOM,還無(wú)法指定拒絕策略。
2、newFixedThreadPool,特點(diǎn):包含固定的線程數(shù),空閑線程會(huì)被一直保留;
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}允許的請(qǐng)求隊(duì)列長(zhǎng)度為 Integer.MAX_VALUE,可能會(huì)堆積大量的請(qǐng)求,從而導(dǎo)致 OOM,還無(wú)法指定拒絕策略。
3、newSingleThreadExecutor,特點(diǎn):只有一個(gè)線程的池,順序執(zhí)行每一個(gè)提交的任務(wù);
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}允許的請(qǐng)求隊(duì)列長(zhǎng)度為 Integer.MAX_VALUE,可能會(huì)堆積大量的請(qǐng)求,從而導(dǎo)致 OOM,還無(wú)法指定拒絕策略。
4、ScheduledThreadPoolExecutor,特點(diǎn):用于構(gòu)建具有延時(shí)隊(duì)列的線程池,空閑線程一直被保留;
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}允許的創(chuàng)建線程數(shù)量為 Integer.MAX_VALUE,可能會(huì)創(chuàng)建大量的線程,從而導(dǎo)致 OOM。
............省略Executors其他創(chuàng)建線程的方法。
從上面可以看到,使用Executors創(chuàng)建的線程池,雖然都是ThreadPoolExecutor,但是靈活度都不高,且創(chuàng)建出來(lái)的是具有一定特色的線程池,使用ThreadPoolExecutor類本身去創(chuàng)建線程池,除了可以更加明確線程池的運(yùn)行規(guī)則,還能更多的規(guī)避資源耗盡的風(fēng)險(xiǎn),畢竟我命由我不由天。
當(dāng)然,ThreadPoolExecutor提供了相關(guān)參數(shù)的set方法,如果一定要用Executors去創(chuàng)建,那么記得調(diào)整一下相關(guān)參數(shù),避免大量任務(wù)堆積,產(chǎn)生OOM。
在項(xiàng)目中創(chuàng)建線程池
如果需要在項(xiàng)目中創(chuàng)建線程池,那么應(yīng)該將它設(shè)置成單例模式。
示例,用枚舉的方式創(chuàng)建單例:
// 線程工廠
public class UserThreadFactory implements ThreadFactory {
// 線程組命名標(biāo)識(shí)
private final String namePrefix;
// 線程編號(hào)
private final AtomicInteger nextId = new AtomicInteger(1);
// 定義線程組名稱,在 jstack 問(wèn)題排查時(shí),非常有幫助
UserThreadFactory(String whatFeaturOfGroup) {
namePrefix = "From UserThreadFactory's " + whatFeaturOfGroup + "-Worker-";
}
@Override
public Thread newThread(Runnable task) {
String name = namePrefix + nextId.getAndIncrement();
Thread thread = new Thread(task, name);
return thread;
}
}
// 單例線程池
public enum ThreadPoolEnum {
INSTANCE;
private ThreadPoolExecutor threadPoolExecutor;
// 枚舉的特性,在JVM中只會(huì)被實(shí)例化一次
private ThreadPoolEnum() {
threadPoolExecutor = new ThreadPoolExecutor(2,
4, 10, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(),
new UserThreadFactory("first-threadPool"),
new ThreadPoolExecutor.DiscardPolicy());
}
public ThreadPoolExecutor getInstance() {
return threadPoolExecutor;
}
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor1 = ThreadPoolEnum.INSTANCE.getInstance();
ThreadPoolExecutor executor2 = ThreadPoolEnum.INSTANCE.getInstance();
System.out.println(executor1 == executor2);
Runnable r = () -> {
System.out.println("新創(chuàng)建的線程任務(wù),交由線程池執(zhí)行");
System.out.println(Thread.currentThread().getName());
};
executor1.execute(r);
// 銷毀線程池
executor1.shutdown();
}
}執(zhí)行結(jié)果:

在Spring中創(chuàng)建線程池
@Configuration
public class ThreadPoolConfig {
// 線程池的參數(shù)配置可由外部配置引入
@Bean(destroyMethod = "shutdown")
public ThreadPoolExecutor initThreadPoolExecutor() {
return new ThreadPoolExecutor(2,
4, 10, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(),
new ThreadPoolExecutor.DiscardPolicy());
}
}總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Java關(guān)于遠(yuǎn)程調(diào)試程序教程(以Eclipse為例)
這篇文章主要介紹了Java關(guān)于遠(yuǎn)程調(diào)試程序教程(以Eclipse為例),小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-06-06
Java的MyBatis框架項(xiàng)目搭建與hellow world示例
MyBatis框架為Java程序的數(shù)據(jù)庫(kù)操作帶來(lái)了很大的便利,這里我們就從最基礎(chǔ)的入手,來(lái)看一下Java的MyBatis框架項(xiàng)目搭建與hellow world示例,需要的朋友可以參考下2016-06-06
java批量導(dǎo)入導(dǎo)出文件的實(shí)例分享(兼容xls,xlsx)
這篇文章主要給大家介紹了利用java批量導(dǎo)入導(dǎo)出文件的相關(guān)資料,文中給出了詳細(xì)的實(shí)例代碼,并且兼容xls,xlsx,對(duì)大家具有一定的參考學(xué)習(xí)價(jià)值,下面跟著小編一起來(lái)看看詳細(xì)的介紹吧。2017-06-06
Springboot Session共享實(shí)現(xiàn)原理及代碼實(shí)例
這篇文章主要介紹了Springboot Session共享實(shí)現(xiàn)原理及代碼實(shí)例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-08-08
Java timezone設(shè)置和mybatis連接數(shù)據(jù)庫(kù)時(shí)區(qū)設(shè)置方式
這篇文章主要介紹了Java timezone設(shè)置和mybatis連接數(shù)據(jù)庫(kù)時(shí)區(qū)設(shè)置方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09
Spring?Cloud?Gateway?2.x跨域時(shí)出現(xiàn)重復(fù)Origin的BUG問(wèn)題
這篇文章主要介紹了Spring?Cloud?Gateway?2.x跨域時(shí)出現(xiàn)重復(fù)Origin的BUG問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-04-04
解決lombok的@Data注解無(wú)法打印繼承的父類信息問(wèn)題
在Java編程中,使用@Data注解可能導(dǎo)致子類繼承父類屬性后,打印只顯示子類信息不顯示父類信息,問(wèn)題源于@Data注解作用域僅限于當(dāng)前類,解決方法包括使用@ToString(callSuper=true)注解或重寫toString方法2024-11-11
深入解析Java的WatchService及功能說(shuō)明
WatchService是Java NIO用于監(jiān)控文件系統(tǒng)目錄變化的API,支持創(chuàng)建、修改、刪除等事件,提供線程安全的注冊(cè)與監(jiān)聽機(jī)制,處理事件溢出及平臺(tái)依賴性問(wèn)題,本文給大家深入解析Java的WatchService及功能說(shuō)明,感興趣的朋友一起看看吧2025-09-09
解讀Camunda中強(qiáng)大的監(jiān)聽服務(wù)
本文介紹Camunda中三種常用監(jiān)聽接口(ExecutionListener、JavaDelegate、TaskListener)及其區(qū)別,強(qiáng)調(diào)Expression和DelegateExpression的使用優(yōu)勢(shì),并指導(dǎo)Spring Boot工程搭建與數(shù)據(jù)庫(kù)配置,建議使用H2數(shù)據(jù)庫(kù)簡(jiǎn)化開發(fā)2025-09-09
Java Swing JPasswordField密碼框的實(shí)現(xiàn)示例
這篇文章主要介紹了Java Swing JPasswordField密碼框的實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-12-12

