jdk自帶線程池實(shí)例詳解
二、簡(jiǎn)介
多線程技術(shù)主要解決處理器單元內(nèi)多個(gè)線程執(zhí)行的問題,它可以顯著減少處理器單元的閑置時(shí)間,增加處理器單元的吞吐能力,但頻繁的創(chuàng)建線程的開銷是很大的,那么如何來減少這部分的開銷了,那么就要考慮使用線程池了。線程池就是一個(gè)線程的容器,每次只執(zhí)行額定數(shù)量的線程,線程池就是用來管理這些額定數(shù)量的線程。
三、涉及線程池的類結(jié)構(gòu)圖

其中供我們使用的,主要是ThreadPoolExecutor類。
四、如何創(chuàng)建線程池
我們創(chuàng)建線程池一般有以下幾種方法:
1、使用Executors工廠類
Executors主要提供了下面幾種創(chuàng)建線程池的方法:

下面來看下使用示例:
1)newFixedThreadPool(固定大小的線程池)
public class FixedThreadPool {
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(5);// 創(chuàng)建一個(gè)固定大小為5的線程池
for (int i = 0; i < 10; i++) {
pool.submit(new MyThread());
}
pool.shutdown();
}
}
public class MyThread extends Thread {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "正在執(zhí)行。。。");
}
}
測(cè)試結(jié)果如下:
pool-1-thread-1正在執(zhí)行。。。
pool-1-thread-2正在執(zhí)行。。。
pool-1-thread-3正在執(zhí)行。。。
pool-1-thread-2正在執(zhí)行。。。
pool-1-thread-3正在執(zhí)行。。。
pool-1-thread-2正在執(zhí)行。。。
pool-1-thread-2正在執(zhí)行。。。
pool-1-thread-3正在執(zhí)行。。。
pool-1-thread-5正在執(zhí)行。。。
pool-1-thread-4正在執(zhí)行。。。
固定大小的線程池:每次提交一個(gè)任務(wù)就創(chuàng)建一個(gè)線程,直到線程達(dá)到線程池的最大大小。線程池的大小一旦達(dá)到最大值就會(huì)保持不變,如果某個(gè)線程因?yàn)閳?zhí)行異常而結(jié)束,那么線程池會(huì)補(bǔ)充一個(gè)新線程線。
2)newSingleThreadExecutor(單線程的線程池)
public class SingleThreadPool {
public static void main(String[] args) {
ExecutorService pool=Executors.newSingleThreadExecutor();//創(chuàng)建一個(gè)單線程池
for(int i=0;i<100;i++){
pool.submit(new MyThread());
}
pool.shutdown();
}
}
測(cè)試結(jié)果如下:
pool-1-thread-1正在執(zhí)行。。。
pool-1-thread-1正在執(zhí)行。。。
pool-1-thread-1正在執(zhí)行。。。
pool-1-thread-1正在執(zhí)行。。。
pool-1-thread-1正在執(zhí)行。。。
pool-1-thread-1正在執(zhí)行。。。
pool-1-thread-1正在執(zhí)行。。。
pool-1-thread-1正在執(zhí)行。。。
pool-1-thread-1正在執(zhí)行。。。
pool-1-thread-1正在執(zhí)行。。。
單線程的線程池:這個(gè)線程池只有一個(gè)線程在工作,也就是相當(dāng)于單線程串行執(zhí)行所有任務(wù)。如果這個(gè)唯一的線程因?yàn)楫惓=Y(jié)束,那么會(huì)有一個(gè)新的線程來替代它。此線程池保證所有任務(wù)的執(zhí)行順序按照任務(wù)的提交順序執(zhí)行。
3)newScheduledThreadPool
public class ScheduledThreadPool {
public static void main(String[] args) {
ScheduledExecutorService pool=Executors.newScheduledThreadPool(6);
for(int i=0;i<10000;i++){
pool.submit(new MyThread());
}
pool.schedule(new MyThread(), 1000, TimeUnit.MILLISECONDS);
pool.schedule(new MyThread(), 1000, TimeUnit.MILLISECONDS);
pool.shutdown();
}
}
測(cè)試結(jié)果如下:
pool-1-thread-1正在執(zhí)行。。。
pool-1-thread-6正在執(zhí)行。。。
pool-1-thread-5正在執(zhí)行。。。
pool-1-thread-4正在執(zhí)行。。。
pool-1-thread-2正在執(zhí)行。。。
pool-1-thread-3正在執(zhí)行。。。
pool-1-thread-4正在執(zhí)行。。。
pool-1-thread-5正在執(zhí)行。。。
pool-1-thread-6正在執(zhí)行。。。
pool-1-thread-1正在執(zhí)行。。。
…………此處會(huì)延時(shí)1S…………
pool-1-thread-4正在執(zhí)行。。。
pool-1-thread-1正在執(zhí)行。。。
測(cè)試結(jié)果的最后兩個(gè)線程都是在延時(shí)1S之后,才開始執(zhí)行的。此線程池支持定時(shí)以及周期性執(zhí)行任務(wù)的需求
4)newCachedThreadPool(可緩存的線程池)
public class CachedThreadPool {
public static void main(String[] args) {
ExecutorService pool=Executors.newCachedThreadPool();
for(int i=0;i<100;i++){
pool.submit(new MyThread());
}
pool.shutdown();
}
}
測(cè)試結(jié)果如下:
pool-1-thread-5正在執(zhí)行。。。
pool-1-thread-7正在執(zhí)行。。。
pool-1-thread-5正在執(zhí)行。。。
pool-1-thread-16正在執(zhí)行。。。
pool-1-thread-17正在執(zhí)行。。。
pool-1-thread-16正在執(zhí)行。。。
pool-1-thread-5正在執(zhí)行。。。
pool-1-thread-7正在執(zhí)行。。。
pool-1-thread-16正在執(zhí)行。。。
pool-1-thread-18正在執(zhí)行。。。
pool-1-thread-10正在執(zhí)行。。。
可緩存的線程池:如果線程池的大小超過了處理任務(wù)所需要的線程,那么就會(huì)回收部分空閑(60秒不執(zhí)行任務(wù))的線程,當(dāng)任務(wù)數(shù)增加時(shí),此線程池又可以智能的添加新線程來處理任務(wù)。此線程池不會(huì)對(duì)線程池大小做限制,線程池大小完全依賴于操作系統(tǒng)(或者說JVM)能夠創(chuàng)建的最大線程大小。
官方建議程序員使用較為方便的Executors工廠方法Executors.newCachedThreadPool()(無界線程池,可以進(jìn)行自動(dòng)線程回收)、Executors.newFixedThreadPool(int)(固定大小線程池)Executors.newSingleThreadExecutor()(單個(gè)后臺(tái)線程),這幾種線程池均為大多數(shù)使用場(chǎng)景預(yù)定義了默認(rèn)配置。
2、繼承ThreadPoolExecutor類,并復(fù)寫父類的構(gòu)造方法。
在介紹這種方式之前,我們來分析下前面幾個(gè)創(chuàng)建線程池的底層代碼是怎樣的?
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
}
從Executors工廠類的底層代碼可以看出,工廠類提供的創(chuàng)建線程池的方法,其實(shí)都是通過構(gòu)造ThreadPoolExecutor來實(shí)現(xiàn)的。ThreadPoolExecutor構(gòu)造方法代碼如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
那么接下來,我們就來談?wù)勥@個(gè)ThreadPoolExecutor構(gòu)造方法。在這個(gè)構(gòu)造方法中,主要有以下幾個(gè)參數(shù):
corePoolSize--池中所保存的線程數(shù),包括空閑線程。
maximumPoolSize--池中允許的最大線程數(shù)。
keepAliveTime--當(dāng)線程數(shù)大于corePoolSize時(shí),此為終止空閑線程等待新任務(wù)的最長(zhǎng)時(shí)間。
Unit--keepAliveTime 參數(shù)的時(shí)間單位。
workQueue--執(zhí)行前用于保持任務(wù)的隊(duì)列。此隊(duì)列僅保持由 execute方法提交的 Runnable任務(wù)。
threadFactory--執(zhí)行程序創(chuàng)建新線程時(shí)使用的工廠。
Handler--由于超出線程范圍和隊(duì)列容量而使執(zhí)行被阻塞時(shí)所使用的處理程序。
接下來,咋們來說下這幾個(gè)參數(shù)之間的關(guān)系。當(dāng)線程池剛創(chuàng)建的時(shí)候,線程池里面是沒有任何線程的(注意,并不是線程池一創(chuàng)建,里面就創(chuàng)建了一定數(shù)量的線程),當(dāng)調(diào)用execute()方法添加一個(gè)任務(wù)時(shí),線程池會(huì)做如下的判斷:
1)如果當(dāng)前正在運(yùn)行的線程數(shù)量小于corePoolSize,那么立刻創(chuàng)建一個(gè)新的線程,執(zhí)行這個(gè)任務(wù)。
2)如果當(dāng)前正在運(yùn)行的線程數(shù)量大于或等于corePoolSize,那么這個(gè)任務(wù)將會(huì)放入隊(duì)列中。
3)如果線程池的隊(duì)列已經(jīng)滿了,但是正在運(yùn)行的線程數(shù)量小于maximumPoolSize,那么還是會(huì)創(chuàng)建新的線程,執(zhí)行這個(gè)任務(wù)。
4)如果隊(duì)列已經(jīng)滿了,且當(dāng)前正在運(yùn)行的線程數(shù)量大于或等于maximumPoolSize,那么線程池會(huì)根據(jù)拒絕執(zhí)行策略來處理當(dāng)前的任務(wù)。
5)當(dāng)一個(gè)任務(wù)執(zhí)行完后,線程會(huì)從隊(duì)列中取下一個(gè)任務(wù)來執(zhí)行,如果隊(duì)列中沒有需要執(zhí)行的任務(wù),那么這個(gè)線程就會(huì)處于空閑狀態(tài),如果超過了keepAliveTime存活時(shí)間,則這個(gè)線程會(huì)被線程池回收(注:回收線程是有條件的,如果當(dāng)前運(yùn)行的線程數(shù)量大于corePoolSize的話,這個(gè)線程就會(huì)被銷毀,如果不大于corePoolSize,是不會(huì)銷毀這個(gè)線程的,線程的數(shù)量必須保持在corePoolSize數(shù)量?jī)?nèi)).為什么不是線程一空閑就回收,而是需要等到超過keepAliveTime才進(jìn)行線程的回收了,原因很簡(jiǎn)單:因?yàn)榫€程的創(chuàng)建和銷毀消耗很大,更不能頻繁的進(jìn)行創(chuàng)建和銷毀,當(dāng)超過keepAliveTime后,發(fā)現(xiàn)確實(shí)用不到這個(gè)線程了,才會(huì)進(jìn)行銷毀。這其中unit表示keepAliveTime的時(shí)間單位,unit的定義如下:
public enum TimeUnit {
NANOSECONDS {
// keepAliveTime以納秒為單位
},
MICROSECONDS {
// keepAliveTime以微秒為單位
},
MILLISECONDS {
// keepAliveTime以毫秒為單位
},
SECONDS {
// keepAliveTime以秒為單位
},
MINUTES {
// keepAliveTime以分鐘為單位
},
HOURS {
// keepAliveTime以小時(shí)為單位
},
DAYS {
// keepAliveTime以天為單位
};
下面從源碼來分析一下,對(duì)于上面的幾種情況,主要涉及到的源碼有以下幾塊:
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}
其實(shí),這段代碼很簡(jiǎn)單,主要描述的就是,如果當(dāng)前的線程池小于corePoolSize的時(shí)候,是直接新建一個(gè)線程來處理任務(wù)。
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < maximumPoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}
上面這段代碼描述的是,如果當(dāng)前線程池的數(shù)量小于maximumPoolSize的時(shí)候,也會(huì)創(chuàng)建一個(gè)線程,來執(zhí)行任務(wù)
五、線程池的隊(duì)列
線程池的隊(duì)列,總的來說有3種:
直接提交:工作隊(duì)列的默認(rèn)選項(xiàng)是 SynchronousQueue,它將任務(wù)直接提交給線程而不保持它們。在此,如果不存在可用于立即運(yùn)行任務(wù)的線程,則試圖把任務(wù)加入隊(duì)列將失敗,因此會(huì)構(gòu)造一個(gè)新的線程。此策略可以避免在處理可能具有內(nèi)部依賴性的請(qǐng)求集時(shí)出現(xiàn)鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務(wù)。當(dāng)命令以超過隊(duì)列所能處理的平均數(shù)連續(xù)到達(dá)時(shí),此策略允許無界線程具有增長(zhǎng)的可能性。
無界隊(duì)列:使用無界隊(duì)列(例如,不具有預(yù)定義容量的 LinkedBlockingQueue)將導(dǎo)致在所有 corePoolSize 線程都忙時(shí)新任務(wù)在隊(duì)列中等待。這樣,創(chuàng)建的線程就不會(huì)超過 corePoolSize。(因此,maximumPoolSize的值也就無效了。)當(dāng)每個(gè)任務(wù)完全獨(dú)立于其他任務(wù),即任務(wù)執(zhí)行互不影響時(shí),適合于使用無界隊(duì)列;例如,在 Web頁服務(wù)器中。這種排隊(duì)可用于處理瞬態(tài)突發(fā)請(qǐng)求,當(dāng)命令以超過隊(duì)列所能處理的平均數(shù)連續(xù)到達(dá)時(shí),此策略允許無界線程具有增長(zhǎng)的可能性。
有界隊(duì)列:當(dāng)使用有限的 maximumPoolSizes時(shí),有界隊(duì)列(如 ArrayBlockingQueue)有助于防止資源耗盡,但是可能較難調(diào)整和控制。隊(duì)列大小和最大池大小可能需要相互折衷:使用大型隊(duì)列和小型池可以最大限度地降低 CPU 使用率、操作系統(tǒng)資源和上下文切換開銷,但是可能導(dǎo)致人工降低吞吐量。如果任務(wù)頻繁阻塞(例如,如果它們是 I/O邊界),則系統(tǒng)可能為超過您許可的更多線程安排時(shí)間。使用小型隊(duì)列通常要求較大的池大小,CPU使用率較高,但是可能遇到不可接受的調(diào)度開銷,這樣也會(huì)降低吞吐量。
下面就來說下線程池的隊(duì)列,類結(jié)構(gòu)圖如下:

1)SynchronousQueue
該隊(duì)列對(duì)應(yīng)的就是上面所說的直接提交,首先SynchronousQueue是無界的,也就是說他存數(shù)任務(wù)的能力是沒有限制的,但是由于該Queue本身的特性,在某次添加元素后必須等待其他線程取走后才能繼續(xù)添加。
2)LinkedBlockingQueue
該隊(duì)列對(duì)應(yīng)的就是上面的無界隊(duì)列。
3)ArrayBlockingQueue
該隊(duì)列對(duì)應(yīng)的就是上面的有界隊(duì)列。ArrayBlockingQueue有以下3中構(gòu)造方法:
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = (E[]) new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
if (capacity < c.size())
throw new IllegalArgumentException();
for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
add(it.next());
}
下面我們重點(diǎn)來說下這個(gè)fair,fair表示隊(duì)列訪問線程的競(jìng)爭(zhēng)策略,當(dāng)為true的時(shí)候,任務(wù)插入隊(duì)列遵從FIFO的規(guī)則,如果為false,則可以“插隊(duì)”。舉個(gè)例子,假如現(xiàn)在有很多任務(wù)在排隊(duì),這個(gè)時(shí)候正好一個(gè)線程執(zhí)行完了任務(wù),同時(shí)又新來了一個(gè)任務(wù),如果為false的話,這個(gè)任務(wù)不用在隊(duì)列中排隊(duì),可以直接插隊(duì),然后執(zhí)行。如下圖所示:

六、線程池的拒絕執(zhí)行策略
當(dāng)線程的數(shù)量達(dá)到最大值時(shí),這個(gè)時(shí)候,任務(wù)還在不斷的來,這個(gè)時(shí)候,就只好拒絕接受任務(wù)了。
ThreadPoolExecutor 允許自定義當(dāng)添加任務(wù)失敗后的執(zhí)行策略。你可以調(diào)用線程池的 setRejectedExecutionHandler() 方法,用自定義的RejectedExecutionHandler 對(duì)象替換現(xiàn)有的策略,ThreadPoolExecutor提供的默認(rèn)的處理策略是直接丟棄,同時(shí)拋異常信息,ThreadPoolExecutor 提供 4 個(gè)現(xiàn)有的策略,分別是:
ThreadPoolExecutor.AbortPolicy:表示拒絕任務(wù)并拋出異常,源碼如下:
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an <tt>AbortPolicy</tt>.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always.
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException(); //拋異常
}
}
ThreadPoolExecutor.DiscardPolicy:表示拒絕任務(wù)但不做任何動(dòng)作,源碼如下:
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a <tt>DiscardPolicy</tt>.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
} // 直接拒絕,但不做任何操作
}
ThreadPoolExecutor.CallerRunsPolicy:表示拒絕任務(wù),并在調(diào)用者的線程中直接執(zhí)行該任務(wù),源碼如下:
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a <tt>CallerRunsPolicy</tt>.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run(); // 直接執(zhí)行任務(wù)
}
}
}
ThreadPoolExecutor.DiscardOldestPolicy:表示先丟棄任務(wù)隊(duì)列中的第一個(gè)任務(wù),然后把這個(gè)任務(wù)加進(jìn)隊(duì)列。源碼如下:
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a <tt>DiscardOldestPolicy</tt> for the given executor.
*/
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll(); // 丟棄隊(duì)列中的第一個(gè)任務(wù)
e.execute(r); // 執(zhí)行新任務(wù)
}
}
}
當(dāng)任務(wù)源源不斷到來的時(shí)候,會(huì)從Queue中poll一個(gè)任務(wù)出來,然后執(zhí)行新的任務(wù)
總結(jié)
以上所述是小編給大家介紹的jdk自帶線程池詳解,希望對(duì)大家有所幫助,如果大家有任何疑問請(qǐng)給我留言,小編會(huì)及時(shí)回復(fù)大家的。在此也非常感謝大家對(duì)腳本之家網(wǎng)站的支持!
相關(guān)文章
舉例解析Java多線程編程中需要注意的一些關(guān)鍵點(diǎn)
這篇文章主要介紹了Java多線程編程中需要注意的一些關(guān)鍵點(diǎn),包括ThreadLocal變量與原子更新等一些深層次的內(nèi)容,需要的朋友可以參考下2015-11-11
MybatisPlus處理四種表與實(shí)體的映射及id自增策略分析
在最近的工作中,碰到一個(gè)比較復(fù)雜的返回結(jié)果,發(fā)現(xiàn)簡(jiǎn)單映射已經(jīng)解決不了這個(gè)問題了,只好去求助百度,學(xué)習(xí)mybatis表與實(shí)體的映射應(yīng)該怎么寫,將學(xué)習(xí)筆記結(jié)合工作碰到的問題寫下本文,供自身查漏補(bǔ)缺,同時(shí)已被不時(shí)之需2022-10-10
SpringBoot中配置nacos的方法實(shí)現(xiàn)
本文主要介紹了SpringBoot中配置nacos的方法實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-08-08

