CyclicBarrier之多線程中的循環(huán)柵欄詳解
1. CyclicBarrier簡(jiǎn)介
現(xiàn)實(shí)生活中我們經(jīng)常會(huì)遇到這樣的情景,在進(jìn)行某個(gè)活動(dòng)前需要等待人全部都齊了才開(kāi)始。例如吃飯時(shí)要等全家人都上座了才動(dòng)筷子,旅游時(shí)要等全部人都到齊了才出發(fā),比賽時(shí)要等運(yùn)動(dòng)員都上場(chǎng)后才開(kāi)始。
在JUC包中為我們提供了一個(gè)同步工具類(lèi)能夠很好的模擬這類(lèi)場(chǎng)景,它就是CyclicBarrier類(lèi)。利用CyclicBarrier類(lèi)可以實(shí)現(xiàn)一組線程相互等待,當(dāng)所有線程都到達(dá)某個(gè)屏障點(diǎn)(柵欄)后再進(jìn)行后續(xù)的操作。下圖演示了這一過(guò)程:

CyclicBarrier可以使一定數(shù)量的線程反復(fù)地在柵欄(不同輪次或不同代)位置處匯集。
- CyclicBarrier字面意思是“可重復(fù)使用的柵欄”,CyclicBarrier 和 CountDownLatch 很像,只是 CyclicBarrier 可以有不止一個(gè)柵欄,因?yàn)樗臇艡冢˙arrier)可以重復(fù)使用(Cyclic)。
- 當(dāng)線程到達(dá)柵欄位置時(shí)將調(diào)用await()方法,這個(gè)方法將阻塞(當(dāng)前線程)直到所有線程都到達(dá)柵欄位置。如果所有線程都到達(dá)柵欄位置,那么柵欄將打開(kāi),此時(shí)所有的線程都將被釋放,而柵欄將被重置以便下次使用。
2.CyclicBarrier的使用
2.1 常用方法
//參數(shù)parties:表示要到達(dá)屏障 (柵欄)的線程數(shù)量
//參數(shù)Runnable: 最后一個(gè)線程到達(dá)屏障之后要做的任務(wù)
?
//構(gòu)造方法1
public CyclicBarrier(int parties)
//構(gòu)造方法2
public CyclicBarrier(int parties, Runnable barrierAction)
?
//線程調(diào)用await()方法表示當(dāng)前線程已經(jīng)到達(dá)柵欄,然后會(huì)被阻塞
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
?
//帶時(shí)限的阻塞等待
public int await(long timeout, TimeUnit unit) throws InterruptedException,BrokenBarrierException,TimeoutException {
return dowait(true, unit.toNanos(timeout));
}2.2 使用舉例
適用場(chǎng)景:可用于需要多個(gè)線程均到達(dá)某一步之后才能繼續(xù)往下執(zhí)行的場(chǎng)景
//循環(huán)柵欄-可多次循環(huán)使用
CyclicBarrier cyclicBarrier = new CyclicBarrier(5,()->{
System.out.println(Thread.currentThread().getName()+" 完成最后任務(wù)!");
});
?
IntStream.range(1,6).forEach(i->{
new Thread(()->{
try {
Thread.sleep(new Double(Math.random()*3000).longValue());
System.out.println(Thread.currentThread().getName()+" 到達(dá)柵欄A");
cyclicBarrier.await();//屏障點(diǎn)A,當(dāng)前線程會(huì)阻塞至此,等待計(jì)數(shù)器=0
System.out.println(Thread.currentThread().getName()+" 沖破柵欄A");
}catch (Exception e){
e.printStackTrace();
}
}).start();
});3.CyclicBarrier原理
CyclicBarrier是一道屏障,調(diào)用await()方法后,當(dāng)前線程進(jìn)入阻塞,當(dāng)parties數(shù)量的線程調(diào)用await方法后,所有的await方法會(huì)返回并繼續(xù)往下執(zhí)行。
3.1 成員變量
/** CyclicBarrier使用的排他鎖*/
private final ReentrantLock lock = new ReentrantLock();
/** barrier被沖破前,線程等待的condition*/
private final Condition trip = lock.newCondition();
/** barrier被沖破時(shí),需要滿(mǎn)足的參與線程數(shù)。*/
private final int parties;
/* barrier被沖破后執(zhí)行的方法。*/
private final Runnable barrierCommand;
/** 當(dāng)其輪次 */
private Generation generation = new Generation();
?
/**
*目前等待剩余的參與者數(shù)量。從 parties倒數(shù)到0。每個(gè)輪次該值會(huì)被重置回parties
*/
private int count;(1)CyclicBarrier內(nèi)部是通過(guò)條件隊(duì)列trip來(lái)對(duì)線程進(jìn)行阻塞的,并且其內(nèi)部維護(hù)了兩個(gè)int型的變量parties和count。
- parties表示每次攔截的線程數(shù),該值在構(gòu)造時(shí)進(jìn)行賦值。
- count是內(nèi)部計(jì)數(shù)器,它的初始值和parties相同,以后隨著每次await方法的調(diào)用而減1,直到減為0就將所有線程喚醒。
(2)CyclicBarrier有一個(gè)靜態(tài)內(nèi)部類(lèi)Generation,該類(lèi)的對(duì)象代表柵欄的當(dāng)前代,就像玩游戲時(shí)代表的本局游戲,利用它可以實(shí)現(xiàn)循環(huán)等待
(3)barrierCommand表示換代前執(zhí)行的任務(wù),當(dāng)count減為0時(shí)表示本局游戲結(jié)束,需要轉(zhuǎn)到下一局。在轉(zhuǎn)到下一局游戲之前會(huì)將所有阻塞的線程喚醒,在喚醒所有線程之前你可以通過(guò)指定barrierCommand來(lái)執(zhí)行自己的任務(wù)。
3.2 構(gòu)造器
//構(gòu)造器1:指定本局要攔截的線程數(shù)parties 及 本局結(jié)束時(shí)要執(zhí)行的任務(wù)
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
?
//構(gòu)造器2
public CyclicBarrier(int parties) {
this(parties, null);
}3.3 等待的方法
CyclicBarrier類(lèi)最主要的功能就是使先到達(dá)屏障點(diǎn)的線程阻塞并等待后面的線程,其中它提供了兩種等待的方法,分別是定時(shí)等待和非定時(shí)等待。源代碼中await()方法最終調(diào)用的是dowait()方法:
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException,TimeoutException {
// 獲取獨(dú)占鎖
final ReentrantLock lock = this.lock;
lock.lock();//對(duì)共享資源count,generation操作前,需先上鎖保證線程安全
try {
// 當(dāng)前代--當(dāng)前輪次對(duì)象的引用
final Generation g = generation;
// 如果這輪次broken了,拋出異常
if (g.broken)
throw new BrokenBarrierException();
// 如果線程中斷了,拋出異常
if (Thread.interrupted()) {
breakBarrier();//如果被打斷,通過(guò)此方法設(shè)置當(dāng)前輪次為broken狀態(tài),通知當(dāng)前輪次所有等待的線程
throw new InterruptedException();//拋出異常
}
//自旋前
//1、count值-1
int index = --count;
// 2、判斷是否到0,若是,則沖破屏障點(diǎn)(說(shuō)明最后一個(gè)線程已經(jīng)到達(dá))
if (index == 0) {
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
// 3、執(zhí)行柵欄任務(wù)(若CyclicBarrier構(gòu)造時(shí)傳入了Runnable,則調(diào)用)
if (command != null)
command.run();
ranAction = true;
// 4、更新一輪次,將count重置,將generation重置,喚醒之前等待的線程
nextGeneration();
return 0;
} finally {
// 如果執(zhí)行柵欄任務(wù)(command)的時(shí)候出現(xiàn)了異常,那么就認(rèn)為本輪次破環(huán)了
if (!ranAction)
breakBarrier();
}
}
//計(jì)數(shù)器沒(méi)有到0 =》開(kāi)始自旋,直到屏障被沖破,或者interrupted或者超時(shí)
for (;;) {
try {
// 開(kāi)始等待;如果沒(méi)有時(shí)間限制,則直接等待,直到被喚醒(讓其他線程進(jìn)入到lock的代碼塊執(zhí)行以上邏輯)
if (!timed)
trip.await();//阻塞,此時(shí)會(huì)釋放鎖,以讓其他線程進(jìn)入await方法中,等待屏障被沖破后,向后執(zhí)行
// 如果有時(shí)間限制,則等待指定時(shí)間
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 如果當(dāng)前線程阻塞被interrupt了,并且本輪次還沒(méi)有被break,那么修改本輪次狀態(tài)為broken
if (g == generation && ! g.broken) {
// 讓柵欄失效
breakBarrier();
throw ie;
} else {
// 上面條件不滿(mǎn)足,說(shuō)明這個(gè)線程不是這輪次的,就不會(huì)影響當(dāng)前這代柵欄的執(zhí)行,所以,就打個(gè)中斷標(biāo)記
Thread.currentThread().interrupt();
}
}
// 當(dāng)有任何一個(gè)線程中斷了,就會(huì)調(diào)用breakBarrier方法,就會(huì)喚醒其他的線程,其他線程醒來(lái)后,也要拋出異常
if (g.broken)
throw new BrokenBarrierException();
// g != generation表示正常換輪次了,返回當(dāng)前線程所在柵欄的下標(biāo)
// 如果 g == generation,說(shuō)明還沒(méi)有換,那為什么會(huì)醒了?
// 因?yàn)橐粋€(gè)線程可以使用多個(gè)柵欄,當(dāng)別的柵欄喚醒了這個(gè)線程,就會(huì)走到這里,所以需要判斷是否是當(dāng)前代。
// 正是因?yàn)檫@個(gè)原因,才需要generation來(lái)保證正確。
if (g != generation)
return index;
// 如果有時(shí)間限制,且時(shí)間小于等于0,銷(xiāo)毀柵欄并拋出異常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}//自旋
} finally {
// 釋放獨(dú)占鎖
lock.unlock();
}
}更新本輪次的方法:nextGeneration()
private void nextGeneration() {
trip.signalAll();//喚醒本輪次等待的線程
count = parties;//重置count值為初始值,為下一輪次(代)使用
generation = new Generation();//更新本輪次對(duì)象,這樣自旋中的線程才會(huì)跳出自旋。
}
?
private static class Generation {
boolean broken = false;
}
?
private void breakBarrier() {
generation.broken = true;//設(shè)置標(biāo)識(shí)
count = parties;//重置count值為初始值
trip.signalAll();//喚醒所有等待線程
}總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
IDEA 自定義方法注解模板的實(shí)現(xiàn)方法
這篇文章主要介紹了IDEA 自定義方法注解模板的實(shí)現(xiàn)方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09
xxl-job定時(shí)任務(wù)配置應(yīng)用及添加到springboot項(xiàng)目中實(shí)現(xiàn)動(dòng)態(tài)API調(diào)用
XXL-JOB是一個(gè)分布式任務(wù)調(diào)度平臺(tái),其核心設(shè)計(jì)目標(biāo)是開(kāi)發(fā)迅速、學(xué)習(xí)簡(jiǎn)單、輕量級(jí)、易擴(kuò)展,本篇文章主要是對(duì)xuxueli的xxl-job做一個(gè)簡(jiǎn)單的配置,以及將其添加到自己已有的項(xiàng)目中進(jìn)行api調(diào)用,感興趣的朋友跟隨小編一起看看吧2024-04-04
深度解析Spring AI請(qǐng)求與響應(yīng)機(jī)制的核心邏輯
我們?cè)谇懊娴膬蓚€(gè)章節(jié)中基本上對(duì)Spring Boot 3版本的新變化進(jìn)行了全面的回顧,以確保在接下來(lái)研究Spring AI時(shí)能夠避免任何潛在的問(wèn)題,本文給大家介紹Spring AI請(qǐng)求與響應(yīng)機(jī)制的核心邏輯,感興趣的朋友跟隨小編一起看看吧2024-11-11
SpringMVC 重定向參數(shù)RedirectAttributes實(shí)例
這篇文章主要介紹了SpringMVC 重定向參數(shù)RedirectAttributes實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12
里氏代換原則_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
這篇文章主要為大家詳細(xì)介紹了里氏代換原則的相關(guān)資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-08-08
詳解微信開(kāi)發(fā)之Author網(wǎng)頁(yè)授權(quán)
微信開(kāi)發(fā)中,經(jīng)常有這樣的需求:獲得用戶(hù)頭像、綁定微信號(hào)給用戶(hù)發(fā)信息,那么實(shí)現(xiàn)這些的前提就是授權(quán)!本文對(duì)此進(jìn)行系統(tǒng)介紹,需要的朋友一起來(lái)看下吧2016-12-12
c語(yǔ)言來(lái)實(shí)現(xiàn)貪心算法之裝箱問(wèn)題
這篇文章主要介紹了c語(yǔ)言來(lái)實(shí)現(xiàn)貪心算法之裝箱問(wèn)題,需要的朋友可以參考下2015-03-03
springboot實(shí)現(xiàn)mock平臺(tái)的示例代碼
本文主要介紹了springboot實(shí)現(xiàn)mock平臺(tái)的示例代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-06-06

