ThreadPoolExecutor線程池原理及其execute方法(詳解)
jdk1.7.0_79
對于線程池大部分人可能會用,也知道為什么用。無非就是任務(wù)需要異步執(zhí)行,再者就是線程需要統(tǒng)一管理起來。對于從線程池中獲取線程,大部分人可能只知道,我現(xiàn)在需要一個線程來執(zhí)行一個任務(wù),那我就把任務(wù)丟到線程池里,線程池里有空閑的線程就執(zhí)行,沒有空閑的線程就等待。實(shí)際上對于線程池的執(zhí)行原理遠(yuǎn)遠(yuǎn)不止這么簡單。
在Java并發(fā)包中提供了線程池類——ThreadPoolExecutor,實(shí)際上更多的我們可能用到的是Executors工廠類為我們提供的線程池:newFixedThreadPool、newSingleThreadPool、newCachedThreadPool,這三個線程池并不是ThreadPoolExecutor的子類,關(guān)于這幾者之間的關(guān)系,我們先來查看ThreadPoolExecutor,查看源碼發(fā)現(xiàn)其一共有4個構(gòu)造方法。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
首先就從這幾個參數(shù)開始來了解線程池ThreadPoolExecutor的執(zhí)行原理。
corePoolSize:核心線程池的線程數(shù)量
maximumPoolSize:最大的線程池線程數(shù)量
keepAliveTime:線程活動保持時(shí)間,線程池的工作線程空閑后,保持存活的時(shí)間。
unit:線程活動保持時(shí)間的單位。
workQueue:指定任務(wù)隊(duì)列所使用的阻塞隊(duì)列
corePoolSize和maximumPoolSize都在指定線程池中的線程數(shù)量,好像平時(shí)用到線程池的時(shí)候最多就只需要傳遞一個線程池大小的參數(shù)就能創(chuàng)建一個線程池啊,Java為我們提供了一些常用的線程池類就是上面提到的newFixedThreadPool、newSingleThreadExecutor、newCachedThreadPool,當(dāng)然如果我們想要自己發(fā)揮創(chuàng)建自定義的線程池就得自己來“配置”有關(guān)線程池的一些參數(shù)。
當(dāng)把一個任務(wù)交給線程池來處理的時(shí)候,線程池的執(zhí)行原理如下圖所示參考自《Java并發(fā)編程的藝術(shù)》

①首先會判斷核心線程池里是否有線程可執(zhí)行,有空閑線程則創(chuàng)建一個線程來執(zhí)行任務(wù)。
②當(dāng)核心線程池里已經(jīng)沒有線程可執(zhí)行的時(shí)候,此時(shí)將任務(wù)丟到任務(wù)隊(duì)列中去。
③如果任務(wù)隊(duì)列(有界)也已經(jīng)滿了的話,但運(yùn)行的線程數(shù)小于最大線程池的數(shù)量的時(shí)候,此時(shí)將會新建一個線程用于執(zhí)行任務(wù),但如果運(yùn)行的線程數(shù)已經(jīng)達(dá)到最大線程池的數(shù)量的時(shí)候,此時(shí)將無法創(chuàng)建線程執(zhí)行任務(wù)。
所以實(shí)際上對于線程池不僅是單純地將任務(wù)丟到線程池,線程池中有線程就執(zhí)行任務(wù),沒線程就等待。
為鞏固一下線程池的原理,現(xiàn)在再來了解上面提到的常用的3個線程池:
Executors.newFixedThreadPool:創(chuàng)建一個固定數(shù)量線程的線程池。
// Executors#newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
可以看到newFixedThreadPool中調(diào)用的是ThreadPoolExecutor類,傳遞的參數(shù)corePoolSize= maximumPoolSize=nThread?;仡櫨€程池的執(zhí)行原理,當(dāng)一個任務(wù)提交到線程池中,首先判斷核心線程池里有沒有空閑線程,有則創(chuàng)建線程,沒有則將任務(wù)放到任務(wù)隊(duì)列(這里是有界阻塞隊(duì)列LinkedBlockingQueue)中,如果任務(wù)隊(duì)列已經(jīng)滿了的話,對于newFixedThreadPool來說,它的最大線程池?cái)?shù)量=核心線程池?cái)?shù)量,此時(shí)任務(wù)隊(duì)列也滿了,將不能擴(kuò)展創(chuàng)建新的線程來執(zhí)行任務(wù)。
Executors.newSingleThreadExecutor:創(chuàng)建只包含一個線程的線程池?!?/strong>
//Executors# newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegateExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}
只有一個線程的線程池好像有點(diǎn)奇怪,并且并沒有直接將返回ThreadPoolExecutor,甚至也沒有直接將線程池?cái)?shù)量1傳遞給newFixedThreadPool返回。那就說明這個只含有一個線程的線程池,或許并沒有只包含一個線程那么簡單。在其源碼注釋中這么寫到:創(chuàng)建只有一個工作線程的線程池用于操作一個無界隊(duì)列(如果由于前驅(qū)節(jié)點(diǎn)的執(zhí)行被終止結(jié)束了,一個新的線程將會繼續(xù)執(zhí)行后繼節(jié)點(diǎn)線程)任務(wù)得以繼續(xù)執(zhí)行,不同于newFixedThreadPool(1)不會有額外的線程來重新繼續(xù)執(zhí)行后繼節(jié)點(diǎn)。也就是說newSingleThreadExecutor自始至終都只有一個線程在執(zhí)行,這和newFixedThreadPool一樣,但如果線程終止結(jié)束過后newSingleThreadExecutor則會重新創(chuàng)建一個新的線程來繼續(xù)執(zhí)行任務(wù)隊(duì)列中的線程,而newFixedThreaPool則不會。
Executors.newCachedThreadPool:根據(jù)需要創(chuàng)建新線程的線程池。
//Executors#newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPooExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
可以看到newCachedThread返回的是ThreadPoolExecutor,其參數(shù)核心線程池corePoolSize = 0, maximumPoolSize = Integer.MAX_VALUE,這也就是說當(dāng)任務(wù)被提交到newCachedThread線程池時(shí),將會直接把任務(wù)放到SynchronousQueue任務(wù)隊(duì)列中,maximumPool從任務(wù)隊(duì)列中獲取任務(wù)。注意SynchronousQueue是一個沒有容量的隊(duì)列,也就是說每個入隊(duì)操作必須等待另一個線程的對應(yīng)出隊(duì)操作,如果主線程提交任務(wù)的速度高于maximumPool中線程處理任務(wù)的速度時(shí),newCachedThreadPool會不斷創(chuàng)建線程,線程多并不是一件好事,嚴(yán)重會耗盡CPU和內(nèi)存資源。
題外話:newFixedThreadPool、newSingleThreadExecutor、newCachedThreadPool,這三者都直接或間接調(diào)用了ThreadPoolExecutor,為什么它們?nèi)邲]有直接是其子類,而是通過Executors來實(shí)例化呢?這是所采用的靜態(tài)工廠方法,在java.util.Connections接口中同樣也是采用的靜態(tài)工廠方法來創(chuàng)建相關(guān)的類。這樣有很多好處,靜態(tài)工廠方法是用來產(chǎn)生對象的,產(chǎn)生什么對象沒關(guān)系,只要返回原返回類型或原返回類型的子類型都可以,降低API數(shù)目和使用難度,在《Effective Java》中的第1條就是靜態(tài)工廠方法。
回到ThreadPoolExecutor,首先來看它的繼承關(guān)系:

ThreadPoolExecutor它的頂級父類是Executor接口,只包含了一個方法——execute,這個方法也就是線程池的“執(zhí)行”。
//Executor#execute
public interface Executor {
void execute(Runnable command);
}
Executor#execute的實(shí)現(xiàn)則是在ThreadPoolExecutor中實(shí)現(xiàn)的:
//ThreadPoolExecutor#execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
…
}
一來就碰到個不知所云的ctl變量它的定義:
private final AtomicInteger ctl = new AtlmicInteger(ctlOf(RUNNING, 0));
這個變量使用來干嘛的呢?它的作用有點(diǎn)類似我們在《ReadWriteLock接口及其實(shí)現(xiàn)ReentrantReadWriteLock》中提到的讀寫鎖有讀、寫兩個同步狀態(tài),而AQS則只提供了state一個int型變量,此時(shí)將state高16位表示為讀狀態(tài),低16位表示為寫狀態(tài)。這里的clt同樣也是,它表示了兩個概念:
workerCount:當(dāng)前有效的線程數(shù)
runState:當(dāng)前線程池的五種狀態(tài),Running、Shutdown、Stop、Tidying、Terminate。
int型變量一共有32位,線程池的五種狀態(tài)runState至少需要3位來表示,故workCount只能有29位,所以代碼中規(guī)定線程池的有效線程數(shù)最多為229-1。
//ThreadPoolExecutor private static final int COUNT_BITS = Integer.SIZE – 3; //32-3=29,線程數(shù)量所占位數(shù) private static final int CAPACITY = (1 << COUNT_BITS) – 1; //低29位表示最大線程數(shù),229-1 //五種線程池狀態(tài) private static final int RUNNING = -1 << COUNT_BITS; /int型變量高3位(含符號位)101表RUNING private static final int SHUTDOWN = 0 << COUNT_BITS; //高3位000 private static final int STOP = 1 << COUNT_BITS; //高3位001 private static final int TIDYING = 2 << COUNT_BITS; //高3位010 private static final int TERMINATED = 3 << COUNT_BITS; //高3位011
再次回到ThreadPoolExecutor#execute方法:
//ThreadPoolExecutor#execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get(); //由它可以獲取到當(dāng)前有效的線程數(shù)和線程池的狀態(tài)
/*1.獲取當(dāng)前正在運(yùn)行線程數(shù)是否小于核心線程池,是則新創(chuàng)建一個線程執(zhí)行任務(wù),否則將任務(wù)放到任務(wù)隊(duì)列中*/
if (workerCountOf(c) < corePoolSize){
if (addWorker(command, tre)) //在addWorker中創(chuàng)建工作線程執(zhí)行任務(wù)
return ;
c = ctl.get();
}
/*2.當(dāng)前核心線程池中全部線程都在運(yùn)行workerCountOf(c) >= corePoolSize,所以此時(shí)將線程放到任務(wù)隊(duì)列中*/
if (isRunning(c) && workQueue.offer(command)) { //線程池是否處于運(yùn)行狀態(tài),且是否任務(wù)插入任務(wù)隊(duì)列成功
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command)) //線程池是否處于運(yùn)行狀態(tài),如果不是則使剛剛的任務(wù)出隊(duì)
reject(command); //拋出RejectedExceptionException異常
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/*3.插入隊(duì)列不成功,且當(dāng)前線程數(shù)數(shù)量小于最大線程池?cái)?shù)量,此時(shí)則創(chuàng)建新線程執(zhí)行任務(wù),創(chuàng)建失敗拋出異常*/
else if (!addWorker(command, false)){
reject(command); //拋出RejectedExceptionException異常
}
}
上面代碼注釋第7行的即判斷當(dāng)前核心線程池里是否有空閑線程,有則通過addWorker方法創(chuàng)建工作線程執(zhí)行任務(wù)。addWorker方法較長,篩選出重要的代碼來解析。
//ThreadPoolExecutor#addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
/*首先會再次檢查線程池是否處于運(yùn)行狀態(tài),核心線程池中是否還有空閑線程,都滿足條件過后則會調(diào)用compareAndIncrementWorkerCount先將正在運(yùn)行的線程數(shù)+1,數(shù)量自增成功則跳出循環(huán),自增失敗則繼續(xù)從頭繼續(xù)循環(huán)*/
...
if (compareAndIncrementWorkerCount(c))
break retry;
...
/*正在運(yùn)行的線程數(shù)自增成功后則將線程封裝成工作線程Worker*/
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock; //全局鎖
w = new Woker(firstTask); //將線程封裝為Worker工作線程
final Thread t = w.thread;
if (t != null) {
mainLock.lock(); //獲取全局鎖
/*當(dāng)持有了全局鎖的時(shí)候,還需要再次檢查線程池的運(yùn)行狀態(tài)等*/
try {
int c = clt.get();
int rs = runStateOf(c); //線程池運(yùn)行狀態(tài)
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)){ //線程池處于運(yùn)行狀態(tài),或者線程池關(guān)閉且任務(wù)線程為空
if (t.isAlive()) //線程處于活躍狀態(tài),即線程已經(jīng)開始執(zhí)行或者還未死亡,正確的應(yīng)線程在這里應(yīng)該是還未開始執(zhí)行的
throw new IllegalThreadStateException();
workers.add(w); //private final HashSet<Worker> wokers = new HashSet<Worker>();包含線程池中所有的工作線程,只有在獲取了全局的時(shí)候才能訪問它。將新構(gòu)造的工作線程加入到工作線程集合中
int s = worker.size(); //工作線程數(shù)量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true; //新構(gòu)造的工作線程加入成功
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); //在被構(gòu)造為Worker工作線程,且被加入到工作線程集合中后,執(zhí)行線程任務(wù),注意這里的start實(shí)際上執(zhí)行Worker中run方法,所以接下來分析Worker的run方法
workerStarted = true;
}
}
} finally {
if (!workerStarted) //未能成功創(chuàng)建執(zhí)行工作線程
addWorkerFailed(w); //在啟動工作線程失敗后,將工作線程從集合中移除
}
return workerStarted;
}
在上面第35代碼中,工作線程被成功添加到工作線程集合中后,則開始start執(zhí)行,這里start執(zhí)行的是Worker工作線程中的run方法。
//ThreadPoolExecutor$Worker,它繼承了AQS,同時(shí)實(shí)現(xiàn)了Runnable,所以它具備了這兩者的所有特性
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread;
Runnable firstTask;
public Worker(Runnable firstTask) {
setState(-1); //設(shè)置AQS的同步狀態(tài)為-1,禁止中斷,直到調(diào)用runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); //通過線程工廠來創(chuàng)建一個線程,將自身作為Runnable傳遞傳遞
}
public void run() {
runWorker(this); //運(yùn)行工作線程
}
}
ThreadPoolExecutor#runWorker,在此方法中,Worker在執(zhí)行完任務(wù)后,還會循環(huán)獲取任務(wù)隊(duì)列里的任務(wù)執(zhí)行(其中的getTask方法),也就是說Worker不僅僅是在執(zhí)行完給它的任務(wù)就釋放或者結(jié)束,它不會閑著,而是繼續(xù)從任務(wù)隊(duì)列中獲取任務(wù),直到任務(wù)隊(duì)列中沒有任務(wù)可執(zhí)行時(shí),它才退出循環(huán)完成任務(wù)。理解了以上的源碼過后,往后線程池執(zhí)行原理的第二步、第三步的理解實(shí)則水到渠成。
以上這篇ThreadPoolExecutor線程池原理及其execute方法(詳解)就是小編分享給大家的全部內(nèi)容了,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
SpringMVC 通過commons-fileupload實(shí)現(xiàn)文件上傳功能
這篇文章主要介紹了SpringMVC 通過commons-fileupload實(shí)現(xiàn)文件上傳,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-02-02
Java中的FileWriter用法詳解與實(shí)戰(zhàn)記錄
這篇文章主要給大家介紹了關(guān)于Java中FileWriter用法的相關(guān)資料,包括寫入字符數(shù)據(jù)到文件、字符數(shù)組和部分字符寫入、配合BufferedWriter使用等方法,同時(shí)也解釋了其與OutputStreamWriter,BufferedWriter的異同特性,適合簡單的文件寫入操作,需要的朋友可以參考下2024-10-10
javaweb中mysql數(shù)據(jù)庫連接步驟方法及其實(shí)例
這篇文章主要介紹了使用java web 連接MySQL數(shù)據(jù)庫的驅(qū)動方法的相關(guān)知識,本文介紹的非常詳細(xì),具有參考借鑒價(jià)值,需要的朋友可以參考下2017-04-04
maven中自定義MavenArchetype的實(shí)現(xiàn)
Maven自身提供了許多Archetype來方便用戶創(chuàng)建Project,為了避免在創(chuàng)建project時(shí)重復(fù)的拷貝和修改,我們通過自定義Archetype來規(guī)范顯得還蠻有必要,下面就來介紹一下,感興趣的可以了解一下2025-01-01
Quarkus中實(shí)現(xiàn)Resteasy的文件上傳下載操作
這篇文章主要為大家介紹了Quarkus中實(shí)現(xiàn)Resteasy的文件上傳下載的操作過程步驟,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步2022-02-02
使用springboot整合mybatis-plus實(shí)現(xiàn)數(shù)據(jù)庫的增刪查改示例
這篇文章主要介紹了使用springboot整合mybatis-plus實(shí)現(xiàn)數(shù)據(jù)庫的增刪查改示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04

