AQS同步組件CyclicBarrier循環(huán)屏障用例剖析
CyclicBarrier原理
CyclicBarrier 的字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達(dá)一個屏障(也可以叫同步點)時被阻塞,直到最后一個線程到達(dá)屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續(xù)運行。
當(dāng)某個線程調(diào)用了await方法之后,就會進(jìn)入等待狀態(tài),并將計數(shù)器+1,直到所有線程調(diào)用await方法使計數(shù)器為CyclicBarrier設(shè)置的值,才可以繼續(xù)執(zhí)行,由于計數(shù)器可以重復(fù)使用,所以我們又叫它循環(huán)屏障。
CyclicBarrier默認(rèn)的構(gòu)造方法是CyclicBarrier(int parties),其參數(shù)表示屏障攔截的線程數(shù)量,每個線程調(diào)用await方法告訴CyclicBarrier我已經(jīng)到達(dá)了屏障,然后當(dāng)前線程被阻塞。
CyclicBarrier可以用于多線程計算數(shù)據(jù),最后合并計算結(jié)果的應(yīng)用場景。
源碼分析
/**
* 創(chuàng)建一個新的CyclicBarrier當(dāng)給定數(shù)量的參與方(線程)等待它時,它將觸發(fā),
* 并且在障礙觸發(fā)時不執(zhí)行預(yù)定義的操作。
*
* @param 在barrier被觸發(fā)之前必須調(diào)用await()的線程數(shù)
* @throws IllegalArgumentException 如果parties小于1拋出異常
*/
public CyclicBarrier(int parties) {
this(parties, null);
}
/**
*
* 當(dāng)前線程調(diào)用await方法的線程告知CyclicBarrier已經(jīng)到達(dá)屏障,然后當(dāng)前線程被阻塞
*
* @return 當(dāng)前線程的到達(dá)索引,其中索引為- 1表示第一個到達(dá)的,0表示最后一個到達(dá)的
* @throws InterruptedException 如果當(dāng)前線程在等待時被中斷
* @throws BrokenBarrierException 如果另一個線程在當(dāng)前線程等待時被中斷或超時,
* 或者屏障被重置,或者在調(diào)用await方法時屏障被破壞,或者屏障操作(如果存在)由于異常而失敗
*/
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
使用案例
await()
/**
* 線程數(shù)量
*/
private final static int threadCount = 15;
/**
* 屏障攔截的線程數(shù)量為5,表示每次屏障會攔截5個線程
*/
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready {}", threadNum,barrier.getNumberWaiting());
//每次調(diào)用await方法后計數(shù)器+1,當(dāng)前線程被阻塞
barrier.await();
log.info("{} continue", threadNum);
}
輸出結(jié)果:
16:16:40.245 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 0 is ready 0
16:16:41.244 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 1 is ready 1
16:16:42.244 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 2 is ready 2
16:16:43.244 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 3 is ready 3
16:16:44.245 [pool-1-thread-5] INFO com.zjq.aqs.CyclicBarrier - 4 is ready 4
16:16:44.245 [pool-1-thread-5] INFO com.zjq.aqs.CyclicBarrier - 4 continue
16:16:44.245 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 0 continue
16:16:44.245 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 1 continue
16:16:44.245 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 2 continue
16:16:44.245 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 3 continue
16:16:45.245 [pool-1-thread-6] INFO com.zjq.aqs.CyclicBarrier - 5 is ready 0
16:16:46.245 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 6 is ready 1
16:16:47.246 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 7 is ready 2
16:16:48.246 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 8 is ready 3
16:16:49.246 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 9 is ready 4
16:16:49.246 [pool-1-thread-6] INFO com.zjq.aqs.CyclicBarrier - 5 continue
16:16:49.246 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 9 continue
16:16:49.246 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 6 continue
16:16:49.246 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 8 continue
16:16:49.246 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 7 continue
16:16:50.247 [pool-1-thread-5] INFO com.zjq.aqs.CyclicBarrier - 10 is ready 0
16:16:51.247 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 11 is ready 1
16:16:52.247 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 12 is ready 2
16:16:53.248 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 13 is ready 3
16:16:54.248 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 14 is ready 4
16:16:54.248 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 14 continue
16:16:54.248 [pool-1-thread-5] INFO com.zjq.aqs.CyclicBarrier - 10 continue
16:16:54.248 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 12 continue
16:16:54.248 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 11 continue
16:16:54.248 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 13 continue
通過輸出結(jié)果可以知道,每次屏障會阻塞5個線程,5個線程執(zhí)行后計數(shù)器達(dá)到預(yù)設(shè)值,繼續(xù)執(zhí)行后續(xù)操作。
await(long timeout, TimeUnit unit)
/**
* 線程數(shù)量
*/
private final static int threadCount = 15;
/**
* 屏障攔截的線程數(shù)量為5,表示每次屏障會攔截5個線程
*/
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready{}", threadNum,barrier.getNumberWaiting());
//每次調(diào)用await方法后計數(shù)器+1,當(dāng)前線程被阻塞
//等待2s.為了使在發(fā)生異常的時候,不影響其他線程,一定要catch
//由于設(shè)置了超時時間后阻塞的線程可能會被中斷,拋出BarrierException異常,如果想繼續(xù)往下執(zhí)行,需要加上try-catch
try {
barrier.await(2, TimeUnit.SECONDS);
}catch (Exception e){
//查看執(zhí)行異常的線程
log.info("線程{} 執(zhí)行異常,阻塞被中斷?{}",threadNum,barrier.isBroken());
}
log.info("{} continue", threadNum);
}
輸出結(jié)果:
17:06:24.440 [pool-1-thread-1] INFO com.zjq.CyclicBarrier - 0 is ready0
17:06:25.435 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 1 is ready1
17:06:26.435 [pool-1-thread-3] INFO com.zjq.CyclicBarrier - 2 is ready2
17:06:26.455 [pool-1-thread-1] INFO com.zjq.CyclicBarrier - 線程0 執(zhí)行異常,阻塞被中斷?true
17:06:26.456 [pool-1-thread-3] INFO com.zjq.CyclicBarrier - 線程2 執(zhí)行異常,阻塞被中斷?true
17:06:26.456 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 線程1 執(zhí)行異常,阻塞被中斷?true
17:06:26.456 [pool-1-thread-1] INFO com.zjq.CyclicBarrier - 0 continue
17:06:26.456 [pool-1-thread-3] INFO com.zjq.CyclicBarrier - 2 continue
17:06:26.456 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 1 continue
17:06:27.434 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 3 is ready0
17:06:27.434 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 線程3 執(zhí)行異常,阻塞被中斷?true
17:06:27.434 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 3 continue
17:06:28.435 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 4 is ready0
17:06:28.435 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 線程4 執(zhí)行異常,阻塞被中斷?true
17:06:28.435 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 4 continue
17:06:29.435 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 5 is ready0
17:06:29.435 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 線程5 執(zhí)行異常,阻塞被中斷?true
17:06:29.435 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 5 continue
17:06:30.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 6 is ready0
17:06:30.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 線程6 執(zhí)行異常,阻塞被中斷?true
17:06:30.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 6 continue
17:06:31.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 7 is ready0
17:06:31.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 線程7 執(zhí)行異常,阻塞被中斷?true
17:06:31.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 7 continue
17:06:32.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 8 is ready0
17:06:32.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 線程8 執(zhí)行異常,阻塞被中斷?true
17:06:32.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 8 continue
17:06:33.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 9 is ready0
17:06:33.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 線程9 執(zhí)行異常,阻塞被中斷?true
17:06:33.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 9 continue
CyclicBarrier(int parties, Runnable barrierAction)
/**
* 線程到達(dá)屏障時,優(yōu)先執(zhí)行barrierAction,方便處理更復(fù)雜的業(yè)務(wù)場景
*/
private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
log.info("callback is running");
});
輸出結(jié)果:
17:11:38.867 [pool-1-thread-1] INFO com.zjq.CyclicBarrier3 - 0 is ready
17:11:38.966 [pool-1-thread-2] INFO com.zjq.CyclicBarrier3 - 1 is ready
17:11:39.067 [pool-1-thread-3] INFO com.zjq.CyclicBarrier3 - 2 is ready
17:11:39.167 [pool-1-thread-4] INFO com.zjq.CyclicBarrier3 - 3 is ready
17:11:39.268 [pool-1-thread-5] INFO com.zjq.CyclicBarrier3 - 4 is ready
17:11:39.268 [pool-1-thread-5] INFO com.zjq.CyclicBarrier3 - callback is running
17:11:39.268 [pool-1-thread-5] INFO com.zjq.CyclicBarrier3 - 4 continue
17:11:39.268 [pool-1-thread-1] INFO com.zjq.CyclicBarrier3 - 0 continue
17:11:39.268 [pool-1-thread-2] INFO com.zjq.CyclicBarrier3 - 1 continue
17:11:39.268 [pool-1-thread-3] INFO com.zjq.CyclicBarrier3 - 2 continue
17:11:39.268 [pool-1-thread-4] INFO com.zjq.CyclicBarrier3 - 3 continue
17:11:39.369 [pool-1-thread-6] INFO com.zjq.CyclicBarrier3 - 5 is ready
17:11:39.470 [pool-1-thread-7] INFO com.zjq.CyclicBarrier3 - 6 is ready
17:11:39.570 [pool-1-thread-8] INFO com.zjq.CyclicBarrier3 - 7 is ready
17:11:39.671 [pool-1-thread-9] INFO com.zjq.CyclicBarrier3 - 8 is ready
17:11:39.772 [pool-1-thread-10] INFO com.zjq.CyclicBarrier3 - 9 is ready
17:11:39.772 [pool-1-thread-10] INFO com.zjq.CyclicBarrier3 - callback is running
17:11:39.772 [pool-1-thread-10] INFO com.zjq.CyclicBarrier3 - 9 continue
17:11:39.772 [pool-1-thread-6] INFO com.zjq.CyclicBarrier3 - 5 continue
17:11:39.772 [pool-1-thread-9] INFO com.zjq.CyclicBarrier3 - 8 continue
17:11:39.772 [pool-1-thread-7] INFO com.zjq.CyclicBarrier3 - 6 continue
17:11:39.772 [pool-1-thread-8] INFO com.zjq.CyclicBarrier3 - 7 continue
CyclicBarrier和CountDownLatch的區(qū)別
- CountDownLatch的計數(shù)器只能使用一次。而CyclicBarrier的計數(shù)器可以使用reset()方法重置。所以CyclicBarrier能處理更為復(fù)雜的業(yè)務(wù)場景,比如如果計算發(fā)生錯誤,可以重置計數(shù)器,并讓線程們重新執(zhí)行一次。
- CountDownLatch主要用于實現(xiàn)一個或n個線程需要等待其他線程完成某項操作之后,才能繼續(xù)往下執(zhí)行,描述的是一個或n個線程等待其他線程的關(guān)系,而CyclicBarrier是多個線程相互等待,知道滿足條件以后再一起往下執(zhí)行。描述的是多個線程相互等待的場景
- CyclicBarrier還提供其他有用的方法,比如getNumberWaiting方法可以獲得CyclicBarrier阻塞的線程數(shù)量。isBroken方法用來知道阻塞的線程是否被中斷。
以上就是AQS同步組件CyclicBarrier循環(huán)屏障用例剖析的詳細(xì)內(nèi)容,更多關(guān)于AQS同步組件CyclicBarrier的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
簡單談?wù)刯ava中final,finally,finalize的區(qū)別
Java中final、finally、finalize的區(qū)別與用法,困擾了不少學(xué)習(xí)者,下面我們就這個問題進(jìn)行一些探討,希望對大家的學(xué)習(xí)有所幫助。2016-05-05
Spring中三種常見Bean的初始化參數(shù)機制你了解嗎
在Spring框架中,Bean的實例化與初始化是一個復(fù)雜的過程,本文我們主要來聊一聊它的常見的三種機制:InitializingBean接口、BeanDefinitionRegistryPostProcessor接口和EnvironmentAware接口,感興趣的小伙伴可以了解下2023-11-11
SpringBoot整合Mybatis-Plus、Jwt實現(xiàn)登錄token設(shè)置
Spring Boot整合Mybatis-plus實現(xiàn)登錄常常需要使用JWT來生成用戶的token并設(shè)置用戶權(quán)限的攔截器,本文就來詳細(xì)的介紹一下,具有一定的參考價值,感興趣的可以了解一下2024-02-02

