SpringBoot 多任務(wù)并行+線程池處理的實(shí)現(xiàn)
前言
前幾篇文章著重介紹了后端服務(wù)數(shù)據(jù)庫和多線程并行處理優(yōu)化,并示例了改造前后的偽代碼邏輯。當(dāng)然了,優(yōu)化是無止境的,前人栽樹后人乘涼。作為我們開發(fā)者來說,既然站在了巨人的肩膀上,就要寫出更加優(yōu)化的程序。
SpringBoot開發(fā)案例之JdbcTemplate批量操作
SpringBoot開發(fā)案例之CountDownLatch多任務(wù)并行處理
改造
理論上講,線程越多程序可能更快,但是在實(shí)際使用中我們需要考慮到線程本身的創(chuàng)建以及銷毀的資源消耗,以及保護(hù)操作系統(tǒng)本身的目的。我們通常需要將線程限制在一定的范圍之類,線程池就起到了這樣的作用。
程序邏輯

多任務(wù)并行+線程池處理.png
一張圖能解決的問題,就應(yīng)該盡可能的少BB,當(dāng)然底層原理性的東西還是需要大家去記憶并理解的。
Java 線程池
Java通過Executors提供四種線程池,分別為:
- newCachedThreadPool創(chuàng)建一個(gè)可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
- newFixedThreadPool 創(chuàng)建一個(gè)定長線程池,可控制線程最大并發(fā)數(shù),超出的線程會(huì)在隊(duì)列中等待。
- newScheduledThreadPool 創(chuàng)建一個(gè)定長線程池,支持定時(shí)及周期性任務(wù)執(zhí)行。
- newSingleThreadExecutor 創(chuàng)建一個(gè)單線程化的線程池,它只會(huì)用唯一的工作線程來執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級(jí))執(zhí)行。
優(yōu)點(diǎn)
- 重用存在的線程,減少對(duì)象創(chuàng)建、消亡的開銷,性能佳。
- 可有效控制最大并發(fā)線程數(shù),提高系統(tǒng)資源的使用率,同時(shí)避免過多資源競爭,避免堵塞。
- 提供定時(shí)執(zhí)行、定期執(zhí)行、單線程、并發(fā)數(shù)控制等功能。
代碼實(shí)現(xiàn)
方式一(CountDownLatch)
/**
* 多任務(wù)并行+線程池統(tǒng)計(jì)
* 創(chuàng)建時(shí)間 2018年4月17日
*/
public class StatsDemo {
final static SimpleDateFormat sdf = new SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss");
final static String startTime = sdf.format(new Date());
/**
* IO密集型任務(wù) = 一般為2*CPU核心數(shù)(常出現(xiàn)于線程中:數(shù)據(jù)庫數(shù)據(jù)交互、文件上傳下載、網(wǎng)絡(luò)數(shù)據(jù)傳輸?shù)鹊龋?
* CPU密集型任務(wù) = 一般為CPU核心數(shù)+1(常出現(xiàn)于線程中:復(fù)雜算法)
* 混合型任務(wù) = 視機(jī)器配置和復(fù)雜度自測而定
*/
private static int corePoolSize = Runtime.getRuntime().availableProcessors();
/**
* public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
* TimeUnit unit,BlockingQueue<Runnable> workQueue)
* corePoolSize用于指定核心線程數(shù)量
* maximumPoolSize指定最大線程數(shù)
* keepAliveTime和TimeUnit指定線程空閑后的最大存活時(shí)間
* workQueue則是線程池的緩沖隊(duì)列,還未執(zhí)行的線程會(huì)在隊(duì)列中等待
* 監(jiān)控隊(duì)列長度,確保隊(duì)列有界
* 不當(dāng)?shù)木€程池大小會(huì)使得處理速度變慢,穩(wěn)定性下降,并且導(dǎo)致內(nèi)存泄露。如果配置的線程過少,則隊(duì)列會(huì)持續(xù)變大,消耗過多內(nèi)存。
* 而過多的線程又會(huì) 由于頻繁的上下文切換導(dǎo)致整個(gè)系統(tǒng)的速度變緩——殊途而同歸。隊(duì)列的長度至關(guān)重要,它必須得是有界的,這樣如果線程池不堪重負(fù)了它可以暫時(shí)拒絕掉新的請(qǐng)求。
* ExecutorService 默認(rèn)的實(shí)現(xiàn)是一個(gè)無界的 LinkedBlockingQueue。
*/
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, corePoolSize+1, 10l, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1000));
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(5);
//使用execute方法
executor.execute(new Stats("任務(wù)A", 1000, latch));
executor.execute(new Stats("任務(wù)B", 1000, latch));
executor.execute(new Stats("任務(wù)C", 1000, latch));
executor.execute(new Stats("任務(wù)D", 1000, latch));
executor.execute(new Stats("任務(wù)E", 1000, latch));
latch.await();// 等待所有人任務(wù)結(jié)束
System.out.println("所有的統(tǒng)計(jì)任務(wù)執(zhí)行完成:" + sdf.format(new Date()));
}
static class Stats implements Runnable {
String statsName;
int runTime;
CountDownLatch latch;
public Stats(String statsName, int runTime, CountDownLatch latch) {
this.statsName = statsName;
this.runTime = runTime;
this.latch = latch;
}
public void run() {
try {
System.out.println(statsName+ " do stats begin at "+ startTime);
//模擬任務(wù)執(zhí)行時(shí)間
Thread.sleep(runTime);
System.out.println(statsName + " do stats complete at "+ sdf.format(new Date()));
latch.countDown();//單次任務(wù)結(jié)束,計(jì)數(shù)器減一
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
方式二(Future)
/**
* 多任務(wù)并行+線程池統(tǒng)計(jì)
* 創(chuàng)建時(shí)間 2018年4月17日
*/
public class StatsDemo {
final static SimpleDateFormat sdf = new SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss");
final static String startTime = sdf.format(new Date());
/**
* IO密集型任務(wù) = 一般為2*CPU核心數(shù)(常出現(xiàn)于線程中:數(shù)據(jù)庫數(shù)據(jù)交互、文件上傳下載、網(wǎng)絡(luò)數(shù)據(jù)傳輸?shù)鹊龋?
* CPU密集型任務(wù) = 一般為CPU核心數(shù)+1(常出現(xiàn)于線程中:復(fù)雜算法)
* 混合型任務(wù) = 視機(jī)器配置和復(fù)雜度自測而定
*/
private static int corePoolSize = Runtime.getRuntime().availableProcessors();
/**
* public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
* TimeUnit unit,BlockingQueue<Runnable> workQueue)
* corePoolSize用于指定核心線程數(shù)量
* maximumPoolSize指定最大線程數(shù)
* keepAliveTime和TimeUnit指定線程空閑后的最大存活時(shí)間
* workQueue則是線程池的緩沖隊(duì)列,還未執(zhí)行的線程會(huì)在隊(duì)列中等待
* 監(jiān)控隊(duì)列長度,確保隊(duì)列有界
* 不當(dāng)?shù)木€程池大小會(huì)使得處理速度變慢,穩(wěn)定性下降,并且導(dǎo)致內(nèi)存泄露。如果配置的線程過少,則隊(duì)列會(huì)持續(xù)變大,消耗過多內(nèi)存。
* 而過多的線程又會(huì) 由于頻繁的上下文切換導(dǎo)致整個(gè)系統(tǒng)的速度變緩——殊途而同歸。隊(duì)列的長度至關(guān)重要,它必須得是有界的,這樣如果線程池不堪重負(fù)了它可以暫時(shí)拒絕掉新的請(qǐng)求。
* ExecutorService 默認(rèn)的實(shí)現(xiàn)是一個(gè)無界的 LinkedBlockingQueue。
*/
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, corePoolSize+1, 10l, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1000));
public static void main(String[] args) throws InterruptedException {
List<Future<String>> resultList = new ArrayList<Future<String>>();
//使用submit提交異步任務(wù),并且獲取返回值為future
resultList.add(executor.submit(new Stats("任務(wù)A", 1000)));
resultList.add(executor.submit(new Stats("任務(wù)B", 1000)));
resultList.add(executor.submit(new Stats("任務(wù)C", 1000)));
resultList.add(executor.submit(new Stats("任務(wù)D", 1000)));
resultList.add(executor.submit(new Stats("任務(wù)E", 1000)));
//遍歷任務(wù)的結(jié)果
for (Future<String> fs : resultList) {
try {
System.out.println(fs.get());//打印各個(gè)線任務(wù)執(zhí)行的結(jié)果,調(diào)用future.get() 阻塞主線程,獲取異步任務(wù)的返回結(jié)果
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
//啟動(dòng)一次順序關(guān)閉,執(zhí)行以前提交的任務(wù),但不接受新任務(wù)。如果已經(jīng)關(guān)閉,則調(diào)用沒有其他作用。
executor.shutdown();
}
}
System.out.println("所有的統(tǒng)計(jì)任務(wù)執(zhí)行完成:" + sdf.format(new Date()));
}
static class Stats implements Callable<String> {
String statsName;
int runTime;
public Stats(String statsName, int runTime) {
this.statsName = statsName;
this.runTime = runTime;
}
public String call() {
try {
System.out.println(statsName+ " do stats begin at "+ startTime);
//模擬任務(wù)執(zhí)行時(shí)間
Thread.sleep(runTime);
System.out.println(statsName + " do stats complete at "+ sdf.format(new Date()));
} catch (InterruptedException e) {
e.printStackTrace();
}
return call();
}
}
}
執(zhí)行時(shí)間
以上代碼,均是偽代碼,下面是2000+個(gè)學(xué)生的真實(shí)測試記錄。
2018-04-17 17:42:29.284 INFO 測試記錄81e51ab031eb4ada92743ddf66528d82-單線程順序執(zhí)行,花費(fèi)時(shí)間:3797
2018-04-17 17:42:31.452 INFO 測試記錄81e51ab031eb4ada92743ddf66528d82-多線程并行任務(wù),花費(fèi)時(shí)間:2167
2018-04-17 17:42:33.170 INFO 測試記錄81e51ab031eb4ada92743ddf66528d82-多線程并行任務(wù)+線程池,花費(fèi)時(shí)間:1717
以上就是本文的全部內(nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Struts2學(xué)習(xí)筆記(5)-參數(shù)傳遞方法
本文主要介紹Struts2中參數(shù)傳遞方法,希望能給大家做一個(gè)參考。2016-06-06
JPA在不寫sql的情況下如何實(shí)現(xiàn)模糊查詢
文章介紹了在項(xiàng)目中實(shí)現(xiàn)模糊查詢的幾種方法,包括使用JPA的API、JPQL、QueryByExample和@Query注解,通過實(shí)現(xiàn)Specification接口和定義接口繼承JpaRepository,可以方便地進(jìn)行單字段和多字段的模糊查詢,文章還提到了BINARY函數(shù)的使用以及查詢結(jié)果的返回2024-11-11
Nacos心跳時(shí)間配置及服務(wù)快速上下線方式
這篇文章主要介紹了Nacos心跳時(shí)間配置及服務(wù)快速上下線方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-03-03
java利用注解實(shí)現(xiàn)簡單的excel數(shù)據(jù)讀取
這篇文章主要為大家詳細(xì)介紹了java利用注解實(shí)現(xiàn)簡單的excel數(shù)據(jù)讀取,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-06-06
springboot?vue測試平臺(tái)接口定義及發(fā)送請(qǐng)求功能實(shí)現(xiàn)
這篇文章主要為大家介紹了springboot+vue測試平臺(tái)接口定義及發(fā)送請(qǐng)求功能實(shí)現(xiàn),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-05-05
Java中SynchronousQueue的底層實(shí)現(xiàn)原理剖析
BlockingQueue的實(shí)現(xiàn)類中,有一種阻塞隊(duì)列比較特殊,就是SynchronousQueue(同步移交隊(duì)列),隊(duì)列長度為0。本文就來剖析一下SynchronousQueue的底層實(shí)現(xiàn)原理,感興趣的可以了解一下2022-11-11

