JUC循環(huán)屏障CyclicBarrier與CountDownLatch區(qū)別詳解
前言
jdk中提供了許多的并發(fā)工具類(lèi),大家可能比較熟悉的有CountDownLatch,主要用來(lái)阻塞一個(gè)線(xiàn)程運(yùn)行,直到其他線(xiàn)程運(yùn)行完畢。而jdk還有一個(gè)功能類(lèi)似并發(fā)工具類(lèi)CyclicBarrier,你知道它的作用嗎?和CountDownLatch有什么區(qū)別呢?
對(duì)于CountDownLatch不了解的可以參考# CountDownLatch源碼硬核解析
介紹和使用
CyclicBarrier,循環(huán)屏障,用來(lái)進(jìn)行線(xiàn)程協(xié)作,讓一組線(xiàn)程到達(dá)一個(gè)屏障(也可以叫同步點(diǎn))時(shí)被阻塞,直到最后一個(gè)線(xiàn)程到達(dá)屏障時(shí),會(huì)觸發(fā)自己運(yùn)行,運(yùn)行完后,屏障又會(huì)開(kāi)門(mén),所有被屏障攔截的線(xiàn)程又可以繼續(xù)運(yùn)行。所以CyclicBarrier 是可以重用的。
為了更好的理解,我們舉個(gè)例子,如下圖所示:

我們將屏障想成柵欄,5個(gè)線(xiàn)程想成5頭豬。5頭豬開(kāi)始往前跑,直到都跑到柵欄前,柵欄開(kāi)始做個(gè)自己的任務(wù),比如看看豬多重。然后打開(kāi)柵欄,豬又會(huì)繼續(xù)跑,跑到下一個(gè)柵欄,就這樣循環(huán)....
API介紹
構(gòu)造方法
public CyclicBarrier(int parties): 創(chuàng)建parties個(gè)線(xiàn)程任務(wù)的循環(huán)CyclicBarrierpublic CyclicBarrier(int parties, Runnable barrierAction): 當(dāng)parties個(gè)線(xiàn)程到到屏障出,自己執(zhí)行任務(wù)barrierAction
常用API
int await():線(xiàn)程調(diào)用await方法通知CyclicBarrier本線(xiàn)程已經(jīng)到達(dá)屏障
基本使用
我們將上面豬豬的例子通過(guò)CyclicBarrier簡(jiǎn)單做一個(gè)實(shí)現(xiàn)。
@Slf4j(topic = "c.CyclicBarrierPig")
public class CyclicBarrierPig {
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(5);
CyclicBarrier barrier = new CyclicBarrier(5, () -> {
System.out.println("主人看看哪個(gè)豬跑最快,最肥...");
});
// 循環(huán)跑3次
for (int i = 0; i < 3; i++) {
// 5條豬開(kāi)始跑
for(int j = 0; j<5; j++) {
int finalJ = j;
service.submit(() -> {
log.info("pig{} is run .....", finalJ);
try {
// 隨機(jī)時(shí)間,模擬跑花費(fèi)的時(shí)間
Thread.sleep(new Random().nextInt(5000));
log.info("pig{} reach barrier .....", finalJ);
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
}
service.shutdown();
}
}
運(yùn)行結(jié)果:

實(shí)現(xiàn)原理
我們已經(jīng)明白CyclicBarrier的基本使用了,那我們看看它是如何實(shí)現(xiàn)的。
成員屬性
- 全局鎖:利用可重入鎖實(shí)現(xiàn)的工具類(lèi)
// barrier 實(shí)現(xiàn)是依賴(lài)于Condition條件隊(duì)列,condition 條件隊(duì)列必須依賴(lài)lock才能使用 private final ReentrantLock lock = new ReentrantLock(); // 線(xiàn)程掛起實(shí)現(xiàn)使用的 condition 隊(duì)列,當(dāng)前代所有線(xiàn)程到位,這個(gè)條件隊(duì)列內(nèi)的線(xiàn)程才會(huì)被喚醒 private final Condition trip = lock.newCondition();
- 線(xiàn)程數(shù)量
// 代表多少個(gè)線(xiàn)程到達(dá)屏障開(kāi)始觸發(fā)線(xiàn)程任務(wù) private final int parties; // 表示當(dāng)前“代”還有多少個(gè)線(xiàn)程未到位,初始值為 parties private int count;
- 當(dāng)前代中最后一個(gè)線(xiàn)程到位后要執(zhí)行的任務(wù)
private final Runnable barrierCommand;
- 代, 也是用標(biāo)記一次循環(huán)
// 表示 barrier 對(duì)象當(dāng)前 代
private Generation generation = new Generation();
private static class Generation {
// 表示當(dāng)前“代”是否被打破,如果被打破再來(lái)到這一代的線(xiàn)程 就會(huì)直接拋出 BrokenException 異常
// 且在這一代掛起的線(xiàn)程都會(huì)被喚醒,然后拋出 BrokerException 異常。
boolean broken = false;
}
構(gòu)造方法
public CyclicBarrie(int parties, Runnable barrierAction) {
// 因?yàn)樾∮诘扔?0 的 barrier 沒(méi)有任何意義
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
// 可以為 null
this.barrierCommand = barrierAction;
}
成員方法
await():阻塞等待所有線(xiàn)程到位
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
// timed:表示當(dāng)前調(diào)用await方法的線(xiàn)程是否指定了超時(shí)時(shí)長(zhǎng),如果 true 表示線(xiàn)程是響應(yīng)超時(shí)的
// nanos:線(xiàn)程等待超時(shí)時(shí)長(zhǎng),單位是納秒
private int dowait(boolean timed, long nanos) {
final ReentrantLock lock = this.lock;
// 加鎖
lock.lock();
try {
// 獲取當(dāng)前代
final Generation g = generation;
// 【如果當(dāng)前代是已經(jīng)被打破狀態(tài),則當(dāng)前調(diào)用await方法的線(xiàn)程,直接拋出Broken異?!?
if (g.broken)
throw new BrokenBarrierException();
// 如果當(dāng)前線(xiàn)程被中斷了,則打破當(dāng)前代,然后當(dāng)前線(xiàn)程拋出中斷異常
if (Thread.interrupted()) {
// 設(shè)置當(dāng)前代的狀態(tài)為 broken 狀態(tài),喚醒在 trip 條件隊(duì)列內(nèi)的線(xiàn)程
breakBarrier();
throw new InterruptedException();
}
// 邏輯到這說(shuō)明,當(dāng)前線(xiàn)程中斷狀態(tài)是 false, 當(dāng)前代的 broken 為 false(未打破狀態(tài))
// 假設(shè) parties 給的是 5,那么index對(duì)應(yīng)的值為 4,3,2,1,0
int index = --count;
// 條件成立說(shuō)明當(dāng)前線(xiàn)程是最后一個(gè)到達(dá) barrier 的線(xiàn)程,【需要開(kāi)啟新代,喚醒阻塞線(xiàn)程】
if (index == 0) {
// 柵欄任務(wù)啟動(dòng)標(biāo)記
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
// 啟動(dòng)觸發(fā)的任務(wù)
command.run();
// run()未拋出異常的話(huà),啟動(dòng)標(biāo)記設(shè)置為 true
ranAction = true;
// 開(kāi)啟新的一代,這里會(huì)【喚醒所有的阻塞隊(duì)列】
nextGeneration();
// 返回 0 因?yàn)楫?dāng)前線(xiàn)程是此代最后一個(gè)到達(dá)的線(xiàn)程,index == 0
return 0;
} finally {
// 如果 command.run() 執(zhí)行拋出異常的話(huà),會(huì)進(jìn)入到這里
if (!ranAction)
breakBarrier();
}
}
// 自旋,一直到條件滿(mǎn)足、當(dāng)前代被打破、線(xiàn)程被中斷,等待超時(shí)
for (;;) {
try {
// 根據(jù)是否需要超時(shí)等待選擇阻塞方法
if (!timed)
// 當(dāng)前線(xiàn)程釋放掉 lock,【進(jìn)入到 trip 條件隊(duì)列的尾部掛起自己】,等待被喚醒
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 被中斷后來(lái)到這里的邏輯
// 當(dāng)前代沒(méi)有變化并且沒(méi)有被打破
if (g == generation && !g.broken) {
// 打破屏障
breakBarrier();
// node 節(jié)點(diǎn)在【條件隊(duì)列】?jī)?nèi)收到中斷信號(hào)時(shí) 會(huì)拋出中斷異常
throw ie;
} else {
// 等待過(guò)程中代變化了,完成一次自我打斷
Thread.currentThread().interrupt();
}
}
// 喚醒后的線(xiàn)程,【判斷當(dāng)前代已經(jīng)被打破,線(xiàn)程喚醒后依次拋出 BrokenBarrier 異?!?
if (g.broken)
throw new BrokenBarrierException();
// 當(dāng)前線(xiàn)程掛起期間,最后一個(gè)線(xiàn)程到位了,然后觸發(fā)了開(kāi)啟新的一代的邏輯
if (g != generation)
return index;
// 當(dāng)前線(xiàn)程 trip 中等待超時(shí),然后主動(dòng)轉(zhuǎn)移到阻塞隊(duì)列
if (timed && nanos <= 0L) {
breakBarrier();
// 拋出超時(shí)異常
throw new TimeoutException();
}
}
} finally {
// 解鎖
lock.unlock();
}
}
- breakBarrier():打破 Barrier 屏障
private void breakBarrier() {
// 將代中的 broken 設(shè)置為 true,表示這一代是被打破了,再來(lái)到這一代的線(xiàn)程,直接拋出異常
generation.broken = true;
// 重置 count 為 parties
count = parties;
// 將在trip條件隊(duì)列內(nèi)掛起的線(xiàn)程全部喚醒,喚醒后的線(xiàn)程會(huì)檢查當(dāng)前是否是打破的,然后拋出異常
trip.signalAll();
}
- nextGeneration():開(kāi)啟新的下一代
private void nextGeneration() {
// 將在 trip 條件隊(duì)列內(nèi)掛起的線(xiàn)程全部喚醒
trip.signalAll();
// 重置 count 為 parties
count = parties;
// 開(kāi)啟新的一代,使用一個(gè)新的generation對(duì)象,表示新的一代,新的一代和上一代【沒(méi)有任何關(guān)系】
generation = new Generation();
}
和CountDownLatch的區(qū)別
相同點(diǎn)
二者都能讓一個(gè)或多個(gè)線(xiàn)程阻塞等待,都可以用在多個(gè)線(xiàn)程間的協(xié)調(diào),起到線(xiàn)程同步的作用。
不同點(diǎn)
CountDownLatch的計(jì)數(shù)器只能使用一次,而CyclicBarrier的計(jì)數(shù)器可以反復(fù) 使用。CountDownLatch.await一般阻塞工作線(xiàn)程,所有的進(jìn)行預(yù)備工作的線(xiàn)程執(zhí)行countDown,而CyclicBarrier通過(guò)工作線(xiàn)程調(diào)用await從而自行阻塞,直到所有工作線(xiàn)程達(dá)到指定屏障,所有的線(xiàn)程才會(huì)返回各自執(zhí)行自己的工作。CyclicBarrier還可以提供一個(gè)barrierAction,合并多線(xiàn)程計(jì)算結(jié)果。CountDownLatch會(huì)阻塞主線(xiàn)程,CyclicBarrier不會(huì)阻塞主線(xiàn)程,只會(huì)阻塞子線(xiàn)程。
總結(jié)
本文講解了CyclicBarrier 的基本功能和使用,同時(shí)講解了它大致的實(shí)現(xiàn)。關(guān)于CyclicBarrier 具體有什么使用場(chǎng)景,你可能還是比較迷惑,我再舉個(gè)例子,比如CyclicBarrier 可以用于多線(xiàn)程計(jì)算數(shù)據(jù),最后合并計(jì)算結(jié)果的應(yīng)用場(chǎng)景,更多關(guān)于JUC循環(huán)屏障的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java函數(shù)式編程(十二):監(jiān)控文件修改
這篇文章主要介紹了Java函數(shù)式編程(十二):監(jiān)控文件修改,本文是系列文章的第12篇,其它文章請(qǐng)參閱本文底部的相關(guān)文章,需要的朋友可以參考下2014-09-09
servlet監(jiān)聽(tīng)實(shí)現(xiàn)統(tǒng)計(jì)在線(xiàn)人數(shù)功能 附源碼下載
這篇文章主要為大家詳細(xì)介紹了servlet監(jiān)聽(tīng)統(tǒng)計(jì)在線(xiàn)人數(shù)的實(shí)現(xiàn)方法,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-04-04
SpringBoot獲取當(dāng)前運(yùn)行環(huán)境三種方式小結(jié)
在使用SpringBoot過(guò)程中,我們只需要引入相關(guān)依賴(lài),然后在main方法中調(diào)用SpringBootApplication.run(應(yīng)用程序啟動(dòng)類(lèi).class)方法即可,那么SpringBoot是如何獲取當(dāng)前運(yùn)行環(huán)境呢,接下來(lái)由小編給大家介紹一下SpringBoot獲取當(dāng)前運(yùn)行環(huán)境三種方式,需要的朋友可以參考下2024-01-01
如何通過(guò)zuul添加或修改請(qǐng)求參數(shù)
這篇文章主要介紹了如何通過(guò)zuul添加或修改請(qǐng)求參數(shù)的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07
JDK8新特性-java.util.function-Function接口使用
這篇文章主要介紹了JDK8新特性-java.util.function-Function接口使用,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-04-04
SpringBoot項(xiàng)目yml配置文件不自動(dòng)提示解決方案
這篇文章主要介紹了SpringBoot項(xiàng)目配置文件.yaml/.yml文件編寫(xiě)時(shí)沒(méi)有自動(dòng)提示的解決方案,文章通過(guò)圖文結(jié)合的方式給大家講解的非常詳細(xì),需要的朋友可以參考下2024-06-06
Java IO流體系繼承結(jié)構(gòu)圖_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
這篇文章主要介紹了Java IO流體系繼承結(jié)構(gòu)圖,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友可以參考下2017-05-05

