Java線程的并發(fā)工具類實(shí)現(xiàn)原理解析
在JDK的并發(fā)包里提供了幾個(gè)非常有用的并發(fā)工具類。CountDownLatch、CyclicBarrier和Semaphore工具類提供了一種并發(fā)流程控制的手段,Exchanger工具類則提供了在線程間交換數(shù)據(jù)的一種手段。本章會(huì)配合一些應(yīng)用場(chǎng)景來(lái)介紹如何使用這些工具類。
一、fork/join
1. Fork-Join原理
在必要的情況下,將一個(gè)大任務(wù),拆分(fork)成若干個(gè)小任務(wù),然后再將一個(gè)個(gè)小任務(wù)的結(jié)果進(jìn)行匯總(join)。
適用場(chǎng)景:大數(shù)據(jù)量統(tǒng)計(jì)類任務(wù)。

2. 工作竊取
Fork/Join在實(shí)現(xiàn)上,大任務(wù)拆分出來(lái)的小任務(wù)會(huì)被分發(fā)到不同的隊(duì)列里面,每一個(gè)隊(duì)列都會(huì)用一個(gè)線程來(lái)消費(fèi),這是為了獲取任務(wù)時(shí)的多線程競(jìng)爭(zhēng),但是某些線程會(huì)提前消費(fèi)完自己的隊(duì)列。而有些線程沒有及時(shí)消費(fèi)完隊(duì)列,這個(gè)時(shí)候,完成了任務(wù)的線程就會(huì)去竊取那些沒有消費(fèi)完成的線程的任務(wù)隊(duì)列,為了減少線程競(jìng)爭(zhēng),F(xiàn)ork/Join使用雙端隊(duì)列來(lái)存取小任務(wù),分配給這個(gè)隊(duì)列的線程會(huì)一直從頭取得一個(gè)任務(wù)然后執(zhí)行,而竊取線程總是從隊(duì)列的尾端拉取task。
3. 代碼實(shí)現(xiàn)
我們要使用 ForkJoin 框架,必須首先創(chuàng)建一個(gè) ForkJoin 任務(wù)。它提供在任務(wù)中執(zhí)行 fork 和 join 的操作機(jī)制,通常我們不直接繼承 ForkjoinTask 類,只需要直接繼承其子類。
1、RecursiveAction,用于沒有返回結(jié)果的任務(wù)。
2、RecursiveTask,用于有返回值的任務(wù)。
task 要通過(guò) ForkJoinPool 來(lái)執(zhí)行,使用 invoke、execute、submit提交,兩者的區(qū)別是:invoke 是同步執(zhí)行,調(diào)用之后需要等待任務(wù)完成,才能執(zhí)行后面的代碼;execute、submit 是異步執(zhí)行。
示例1:長(zhǎng)度400萬(wàn)的隨機(jī)數(shù)組求和,使用RecursiveTask 。
/**
* 隨機(jī)產(chǎn)生ARRAY_LENGTH長(zhǎng)的的隨機(jī)數(shù)組
*/
public class MakeArray {
// 數(shù)組長(zhǎng)度
public static final int ARRAY_LENGTH = 4000000;
public static int[] makeArray() {
// new一個(gè)隨機(jī)數(shù)發(fā)生器
Random r = new Random();
int[] result = new int[ARRAY_LENGTH];
for (int i = 0; i < ARRAY_LENGTH; i++) {
// 用隨機(jī)數(shù)填充數(shù)組
result[i] = r.nextInt(ARRAY_LENGTH * 3);
}
return result;
}
}
public class SumArray {
private static class SumTask extends RecursiveTask<Integer> {
// 閾值
private final static int THRESHOLD = MakeArray.ARRAY_LENGTH / 10;
private int[] src;
private int fromIndex;
private int toIndex;
public SumTask(int[] src, int fromIndex, int toIndex) {
this.src = src;
this.fromIndex = fromIndex;
this.toIndex = toIndex;
}
@Override
protected Integer compute() {
// 任務(wù)的大小是否合適
if ((toIndex - fromIndex) < THRESHOLD) {
System.out.println(" from index = " + fromIndex + " toIndex=" + toIndex);
int count = 0;
for (int i = fromIndex; i <= toIndex; i++) {
count = count + src[i];
}
return count;
} else {
// fromIndex....mid.....toIndex
int mid = (fromIndex + toIndex) / 2;
SumTask left = new SumTask(src, fromIndex, mid);
SumTask right = new SumTask(src, mid + 1, toIndex);
invokeAll(left, right);
return left.join() + right.join();
}
}
}
public static void main(String[] args) {
int[] src = MakeArray.makeArray();
// new出池的實(shí)例
ForkJoinPool pool = new ForkJoinPool();
// new出Task的實(shí)例
SumTask innerFind = new SumTask(src, 0, src.length - 1);
long start = System.currentTimeMillis();
// invoke阻塞方法
pool.invoke(innerFind);
System.out.println("Task is Running.....");
System.out.println("The count is " + innerFind.join()
+ " spend time:" + (System.currentTimeMillis() - start) + "ms");
}
}
示例2:遍歷指定目錄(含子目錄)下面的txt文件。
public class FindDirsFiles extends RecursiveAction {
private File path;
public FindDirsFiles(File path) {
this.path = path;
}
@Override
protected void compute() {
List<FindDirsFiles> subTasks = new ArrayList<>();
File[] files = path.listFiles();
if (files!=null){
for (File file : files) {
if (file.isDirectory()) {
// 對(duì)每個(gè)子目錄都新建一個(gè)子任務(wù)。
subTasks.add(new FindDirsFiles(file));
} else {
// 遇到文件,檢查。
if (file.getAbsolutePath().endsWith("txt")){
System.out.println("文件:" + file.getAbsolutePath());
}
}
}
if (!subTasks.isEmpty()) {
// 在當(dāng)前的 ForkJoinPool 上調(diào)度所有的子任務(wù)。
for (FindDirsFiles subTask : invokeAll(subTasks)) {
subTask.join();
}
}
}
}
public static void main(String [] args){
try {
// 用一個(gè) ForkJoinPool 實(shí)例調(diào)度總?cè)蝿?wù)
ForkJoinPool pool = new ForkJoinPool();
FindDirsFiles task = new FindDirsFiles(new File("F:/"));
// 異步提交
pool.execute(task);
// 主線程做自己的業(yè)務(wù)工作
System.out.println("Task is Running......");
Thread.sleep(1);
int otherWork = 0;
for(int i=0;i<100;i++){
otherWork = otherWork+i;
}
System.out.println("Main Thread done sth......,otherWork=" + otherWork);
System.out.println("Task end");
} catch (Exception e) {
e.printStackTrace();
}
}
}
二、CountDownLatch
閉鎖,CountDownLatch 這個(gè)類能夠使一個(gè)線程等待其他線程完成各自的工作后再執(zhí)行。例如,應(yīng)用程序的主線程希望在負(fù)責(zé)啟動(dòng)框架服務(wù)的線程已經(jīng)啟動(dòng)所有的框架服務(wù)之后再執(zhí)行。
CountDownLatch 是通過(guò)一個(gè)計(jì)數(shù)器來(lái)實(shí)現(xiàn)的,計(jì)數(shù)器的初始值為初始任務(wù)的數(shù)量。每當(dāng)完成了一個(gè)任務(wù)后,計(jì)數(shù)器的值就會(huì)減 1(CountDownLatch.countDown()方法)。當(dāng)計(jì)數(shù)器值到達(dá) 0 時(shí),它表示所有的已經(jīng)完成了任務(wù),然后在閉鎖上等待 CountDownLatch.await()方法的線程就可以恢復(fù)執(zhí)行任務(wù)。
示例代碼:
public class CountDownLatchTest {
private static CountDownLatch countDownLatch = new CountDownLatch(2);
private static class BusinessThread extends Thread {
@Override
public void run() {
try {
System.out.println("BusinessThread " + Thread.currentThread().getName() + " start....");
Thread.sleep(3000);
System.out.println("BusinessThread " + Thread.currentThread().getName() + " end.....");
// 計(jì)數(shù)器減1
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
System.out.println("main start....");
new BusinessThread().start();
new BusinessThread().start();
// 等待countDownLatch計(jì)數(shù)器為零后執(zhí)行后面代碼
countDownLatch.await();
System.out.println("main end");
}
}
注意點(diǎn):
1、CountDownLatch(2)并不代表對(duì)應(yīng)兩個(gè)線程。
2、一個(gè)線程中可以多次countDownLatch.countDown(),比如在一個(gè)線程中countDown兩次或者多次。
三、CyclicBarrier
CyclicBarrier 的字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達(dá)一個(gè)屏障(也可以叫同步點(diǎn))時(shí)被阻塞,直到最后一個(gè)線程到達(dá)屏障時(shí),屏障才會(huì)開門,所有被屏障攔截的線程才會(huì)繼續(xù)運(yùn)行。
CyclicBarrier 默認(rèn)的構(gòu)造方法是 CyclicBarrier(int parties),其參數(shù)表示屏障攔截的線程數(shù)量,每個(gè)線程調(diào)用 await 方法告訴 CyclicBarrier 我已經(jīng)到達(dá)了屏障,然后當(dāng)前線程被阻塞。
CyclicBarrier 還提供一個(gè)更高級(jí)的構(gòu)造函數(shù) CyclicBarrie(r int parties,Runnable barrierAction),用于在線程全部到達(dá)屏障時(shí),優(yōu)先執(zhí)行 barrierAction,方便處理更復(fù)雜的業(yè)務(wù)場(chǎng)景。
示例代碼:
public class CyclicBarrierTest {
private static CyclicBarrier barrier = new CyclicBarrier(4, new CollectThread());
/**
* 存放子線程工作結(jié)果的容器
*/
private static ConcurrentHashMap<String, Long> resultMap = new ConcurrentHashMap<>();
public static void main(String[] args) {
for (int i = 0; i < 4; i++) {
Thread thread = new Thread(new SubThread());
thread.start();
}
}
/**
* 匯總的任務(wù)
*/
private static class CollectThread implements Runnable {
@Override
public void run() {
StringBuilder result = new StringBuilder();
for (Map.Entry<String, Long> workResult : resultMap.entrySet()) {
result.append("[" + workResult.getValue() + "]");
}
System.out.println(" the result = " + result);
System.out.println("colletThread end.....");
}
}
/**
* 相互等待的子線程
*/
private static class SubThread implements Runnable {
@Override
public void run() {
long id = Thread.currentThread().getId();
resultMap.put(Thread.currentThread().getId() + "", id);
try {
Thread.sleep(1000 + id);
System.out.println("Thread_" + id + " end1.....");
barrier.await();
Thread.sleep(1000 + id);
System.out.println("Thread_" + id + " end2.....");
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
注意: 一個(gè)線程中可以多次await();
四、Semaphore
Semaphore(信號(hào)量)是用來(lái)控制同時(shí)訪問(wèn)特定資源的線程數(shù)量,它通過(guò)協(xié)調(diào)各個(gè)線程,以保證合理的使用公共資源。應(yīng)用場(chǎng)景 Semaphore 可以用于做流量控制,特別是公用資源有限的應(yīng)用場(chǎng)景,比如數(shù)據(jù)庫(kù)連接池?cái)?shù)量。
方法:常用的前4個(gè)。
| 方法 | 描述 |
|---|---|
| acquire() | 獲取連接 |
| release() | 歸還連接數(shù) |
| intavailablePermits() | 返回此信號(hào)量中當(dāng)前可用的許可證數(shù) |
| intgetQueueLength() | 返回正在等待獲取許可證的線程數(shù) |
| void reducePermit(s int reduction) | 減少 reduction 個(gè)許可證,是個(gè) protected 方法 |
| Collection getQueuedThreads() | 返回所有等待獲取許可證的線程集合,是個(gè) protected 方法 |
示例代碼:模擬數(shù)據(jù)庫(kù)連接池。
/**
* 數(shù)據(jù)庫(kù)連接
*/
public class SqlConnectImpl implements Connection {
/**
* 得到一個(gè)數(shù)據(jù)庫(kù)連接
*/
public static final Connection fetchConnection(){
return new SqlConnectImpl();
}
// 省略其他代碼
}
/**
* 連接池代碼
*/
public class DBPoolSemaphore {
private final static int POOL_SIZE = 10;
// 兩個(gè)指示器,分別表示池子還有可用連接和已用連接
private final Semaphore useful;
private final Semaphore useless;
// 存放數(shù)據(jù)庫(kù)連接的容器
private static LinkedList<Connection> pool = new LinkedList<Connection>();
// 初始化池
static {
for (int i = 0; i < POOL_SIZE; i++) {
pool.addLast(SqlConnectImpl.fetchConnection());
}
}
public DBPoolSemaphore() {
this.useful = new Semaphore(10);
this.useless = new Semaphore(0);
}
/**
* 歸還連接
*/
public void returnConnect(Connection connection) throws InterruptedException {
if (connection != null) {
System.out.println("當(dāng)前有" + useful.getQueueLength() + "個(gè)線程等待數(shù)據(jù)庫(kù)連接!!"
+ "可用連接數(shù):" + useful.availablePermits());
useless.acquire();
synchronized (pool) {
pool.addLast(connection);
}
useful.release();
}
}
/**
* 從池子拿連接
*/
public Connection takeConnect() throws InterruptedException {
useful.acquire();
Connection connection;
synchronized (pool) {
connection = pool.removeFirst();
}
useless.release();
return connection;
}
}
/**
* 測(cè)試代碼
*/
public class AppTest {
private static DBPoolSemaphore dbPool = new DBPoolSemaphore();
private static class BusiThread extends Thread {
@Override
public void run() {
// 讓每個(gè)線程持有連接的時(shí)間不一樣
Random r = new Random();
long start = System.currentTimeMillis();
try {
Connection connect = dbPool.takeConnect();
System.out.println("Thread_" + Thread.currentThread().getId()
+ "_獲取數(shù)據(jù)庫(kù)連接共耗時(shí)【" + (System.currentTimeMillis() - start) + "】ms.");
//模擬業(yè)務(wù)操作,線程持有連接查詢數(shù)據(jù)
Thread.sleep(100 + r.nextInt(100));
System.out.println("查詢數(shù)據(jù)完成,歸還連接!");
dbPool.returnConnect(connect);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
for (int i = 0; i < 50; i++) {
Thread thread = new BusiThread();
thread.start();
}
}
}
當(dāng)然,你也可以使用一個(gè) semaphore 來(lái)實(shí)現(xiàn),不過(guò)需要注意的是 semaphore 的初始數(shù)量為10并不是固定的,如果你后面歸還連接時(shí) dbPool.returnConnect(new SqlConnectImpl()); 的話,那么他的數(shù)量會(huì)變成 11 。
五、Exchange
Exchanger(交換者)是一個(gè)用于線程間協(xié)作的工具類。Exchanger 用于進(jìn)行線程間的數(shù)據(jù)交換。它提供一個(gè)同步點(diǎn),在這個(gè)同步點(diǎn),兩個(gè)線程可以交換彼此的數(shù)據(jù)。這兩個(gè)線程通過(guò) exchange() 方法交換數(shù)據(jù),如果第一個(gè)線程先執(zhí)行 exchange() 方法,它會(huì)一直等待第二個(gè)線程也執(zhí)行 exchange() 方法,當(dāng)兩個(gè)線程都到達(dá)同步點(diǎn)時(shí),這兩個(gè)線程就可以交換數(shù)據(jù),將本線程生產(chǎn)出來(lái)的數(shù)據(jù)傳遞給對(duì)方。
但是這種只能在兩個(gè)線程種傳遞,適用面過(guò)于狹窄。
六、Callable、Future、FutureTask
- Runnable 是一個(gè)接口,在它里面只聲明了一個(gè) run()方法,由于 run()方法返回值為 void 類型,所以在執(zhí)行完任務(wù)之后無(wú)法返回任何結(jié)果。
- Callable 位于 java.util.concurrent 包下,它也是一個(gè)接口,在它里面也只聲明了一個(gè)方法,只不過(guò)這個(gè)方法叫做 call(),這是一個(gè)泛型接口,call()函數(shù)返回的類型就是傳遞進(jìn)來(lái)的 V 類型。
- Future 就是對(duì)于具體的 Runnable 或者 Callable 任務(wù)的執(zhí)行結(jié)果進(jìn)行取消、查詢是否完成、獲取結(jié)果。必要時(shí)可以通過(guò) get 方法獲取執(zhí)行結(jié)果,該方法會(huì)阻塞直到任務(wù)返回結(jié)果。
- FutureTask 因?yàn)?Future 只是一個(gè)接口,所以是無(wú)法直接用來(lái)創(chuàng)建對(duì)象使用的,因此就有了 FutureTask 。
關(guān)系圖示:

所以,我們可以通過(guò) FutureTask 把一個(gè) Callable 包裝成 Runnable,然后再通過(guò)這個(gè) FutureTask 拿到 Callable 運(yùn)行后的返回值。
示例代碼:
public class FutureTaskTest {
private static class CallableTest implements Callable<Integer> {
private int sum = 0;
@Override
public Integer call() throws Exception {
System.out.println("Callable 子線程開始計(jì)算!");
for (int i = 0; i < 5000; i++) {
if (Thread.currentThread().isInterrupted()) {
System.out.println("Callable 子線程計(jì)算任務(wù)中斷!");
return null;
}
sum = sum + i;
System.out.println("sum=" + sum);
}
System.out.println("Callable 子線程計(jì)算結(jié)束!結(jié)果為: " + sum);
return sum;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CallableTest callableTest = new CallableTest();
// 包裝
FutureTask<Integer> futureTask = new FutureTask<>(callableTest);
new Thread(futureTask).start();
Random r = new Random();
if (r.nextInt(100) > 50) {
// 如果r.nextInt(100) > 50則計(jì)算返回結(jié)果
System.out.println("sum = " + futureTask.get());
} else {
// 如果r.nextInt(100) <= 50則取消計(jì)算
System.out.println("Cancel...");
futureTask.cancel(true);
}
}
}
都讀到這里了,來(lái)個(gè) 點(diǎn)贊、評(píng)論、關(guān)注、收藏 吧!
文章作者:IT王小二
首發(fā)地址:https://www.itwxe.com/posts/e4f648cd/
版權(quán)聲明:文章內(nèi)容遵循 署名-非商業(yè)性使用-禁止演繹 4.0 國(guó)際 進(jìn)行許可,轉(zhuǎn)載請(qǐng)?jiān)谖恼马?yè)面明顯位置給出作者與原文鏈接。
到此這篇關(guān)于Java線程的并發(fā)工具類實(shí)現(xiàn)原理解析的文章就介紹到這了,更多相關(guān)java線程并發(fā)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
淺談spring方法級(jí)參數(shù)校驗(yàn)(@Validated)
這篇文章主要介紹了淺談spring方法級(jí)參數(shù)校驗(yàn)(@Validated),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06
java如何拷貝復(fù)制對(duì)象和集合問(wèn)題
這篇文章主要介紹了java如何拷貝復(fù)制對(duì)象和集合問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-09-09
java實(shí)現(xiàn)計(jì)算器加法小程序(圖形化界面)
這篇文章主要介紹了Java實(shí)現(xiàn)圖形化界面的計(jì)算器加法小程序,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-05-05
解決idea update project 更新選項(xiàng)消失的問(wèn)題
這篇文章主要介紹了解決idea update project 更新選項(xiàng)消失的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-01-01
Android bdflow數(shù)據(jù)庫(kù)神器的使用
這篇文章主要介紹了Android bdflow數(shù)據(jù)庫(kù)神器的使用,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-03-03
出現(xiàn)SLF4J:?Failed?to?load?class?“org.slf4j.impl.StaticLog
本文主要介紹了出現(xiàn)SLF4J:?Failed?to?load?class?“org.slf4j.impl.StaticLoggerBinder“.的解決方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07
java中ExecutorService創(chuàng)建方法總結(jié)
在本篇文章里小編給大家整理了一篇關(guān)于java中ExecutorService創(chuàng)建方法總結(jié),有興趣的朋友們可以參考下。2021-01-01

