詳解java CountDownLatch和CyclicBarrier在內(nèi)部實(shí)現(xiàn)和場(chǎng)景上的區(qū)別
前言
CountDownLatch和CyclicBarrier兩個(gè)同為java并發(fā)編程的重要工具類(lèi),它們?cè)谥T多多線程并發(fā)或并行場(chǎng)景中得到了廣泛的應(yīng)用。但兩者就其內(nèi)部實(shí)現(xiàn)和使用場(chǎng)景而言是各有所側(cè)重的。
內(nèi)部實(shí)現(xiàn)差異
前者更多依賴(lài)經(jīng)典的AQS機(jī)制和CAS機(jī)制來(lái)控制器內(nèi)部狀態(tài)的更迭和計(jì)數(shù)器本身的變化,而后者更多依靠可重入Lock等機(jī)制來(lái)控制其內(nèi)部并發(fā)安全性和一致性。
public class {
//Synchronization control For CountDownLatch.
//Uses AQS state to represent count.
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
... ...//
}
public class CyclicBarrier {
/**
* Each use of the barrier is represented as a generation instance.
* The generation changes whenever the barrier is tripped, or
* is reset. There can be many generations associated with threads
* using the barrier - due to the non-deterministic way the lock
* may be allocated to waiting threads - but only one of these
* can be active at a time (the one to which {@code count} applies)
* and all the rest are either broken or tripped.
* There need not be an active generation if there has been a break
* but no subsequent reset.
*/
private static class Generation {
boolean broken = false;
}
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The number of parties */
private final int parties;
/* The command to run when tripped */
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();
/**
* Number of parties still waiting. Counts down from parties to 0
* on each generation. It is reset to parties on each new
* generation or when broken.
*/
private int count;
/**
* Updates state on barrier trip and wakes up everyone.
* Called only while holding lock.
*/
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
/**
* Sets current barrier generation as broken and wakes up everyone.
* Called only while holding lock.
*/
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
... ... //
}
實(shí)戰(zhàn) - 展示各自的使用場(chǎng)景
/**
*類(lèi)說(shuō)明:共5個(gè)初始化子線程,6個(gè)閉鎖扣除點(diǎn),扣除完畢后,主線程和業(yè)務(wù)線程才能繼續(xù)執(zhí)行
*/
public class UseCountDownLatch {
static CountDownLatch latch = new CountDownLatch(6);
/*初始化線程*/
private static class InitThread implements Runnable{
public void run() {
System.out.println("Thread_"+Thread.currentThread().getId()
+" ready init work......");
latch.countDown();
for(int i =0;i<2;i++) {
System.out.println("Thread_"+Thread.currentThread().getId()
+" ........continue do its work");
}
}
}
/*業(yè)務(wù)線程等待latch的計(jì)數(shù)器為0完成*/
private static class BusiThread implements Runnable{
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
for(int i =0;i<3;i++) {
System.out.println("BusiThread_"+Thread.currentThread().getId()
+" do business-----");
}
}
}
public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
public void run() {
SleepTools.ms(1);
System.out.println("Thread_"+Thread.currentThread().getId()
+" ready init work step 1st......");
latch.countDown();
System.out.println("begin step 2nd.......");
SleepTools.ms(1);
System.out.println("Thread_"+Thread.currentThread().getId()
+" ready init work step 2nd......");
latch.countDown();
}
}).start();
new Thread(new BusiThread()).start();
for(int i=0;i<=3;i++){
Thread thread = new Thread(new InitThread());
thread.start();
}
latch.await();
System.out.println("Main do ites work........");
}
}
/**
*類(lèi)說(shuō)明:共4個(gè)子線程,他們?nèi)客瓿晒ぷ骱?,交出自己結(jié)果,
*再被統(tǒng)一釋放去做自己的事情,而交出的結(jié)果被另外的線程拿來(lái)拼接字符串
*/
class UseCyclicBarrier {
private static CyclicBarrier barrier
= new CyclicBarrier(4,new CollectThread());
//存放子線程工作結(jié)果的容器
private static ConcurrentHashMap<String,Long> resultMap
= new ConcurrentHashMap<String,Long>();
public static void main(String[] args) {
for(int i=0;i<4;i++){
Thread thread = new Thread(new SubThread());
thread.start();
}
}
/*匯總的任務(wù)*/
private static class CollectThread implements Runnable{
@Override
public void run() {
StringBuilder result = new StringBuilder();
for(Map.Entry<String,Long> workResult:resultMap.entrySet()){
result.append("["+workResult.getValue()+"]");
}
System.out.println(" the result = "+ result);
System.out.println("do other business........");
}
}
/*相互等待的子線程*/
private static class SubThread implements Runnable{
@Override
public void run() {
long id = Thread.currentThread().getId();
resultMap.put(Thread.currentThread().getId()+"",id);
try {
Thread.sleep(1000+id);
System.out.println("Thread_"+id+" ....do something ");
barrier.await();
Thread.sleep(1000+id);
System.out.println("Thread_"+id+" ....do its business ");
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
兩者總結(jié)
1. Cyclicbarrier結(jié)果匯總的Runable線程可以重復(fù)被執(zhí)行,通過(guò)多次觸發(fā)await()方法,countdownlatch可以調(diào)用await()方法多次;cyclicbarrier若沒(méi)有結(jié)果匯總,則調(diào)用一次await()就夠了;
2. New cyclicbarrier(threadCount)的線程數(shù)必須與實(shí)際的用戶(hù)線程數(shù)一致;
3. 協(xié)調(diào)線程同時(shí)運(yùn)行:countDownLatch協(xié)調(diào)工作線程執(zhí)行,是由外面線程協(xié)調(diào);cyclicbarrier是由工作線程之間相互協(xié)調(diào)運(yùn)行;
4. 從構(gòu)造函數(shù)上看出:countDownlatch控制運(yùn)行的計(jì)數(shù)器數(shù)量和線程數(shù)沒(méi)有關(guān)系;cyclicbarrier構(gòu)造中傳入的線程數(shù)等于實(shí)際執(zhí)行線程數(shù);
5. countDownLatch在不能基于執(zhí)行子線程的運(yùn)行結(jié)果做處理,而cyclicbarrier可以;
6. 就使用場(chǎng)景而言,countdownlatch 更適用于框架加載前的一系列初始化工作等場(chǎng)景; cyclicbarrier更適用于需要多個(gè)用戶(hù)線程執(zhí)行后,將運(yùn)行結(jié)果匯總再計(jì)算等典型場(chǎng)景;
到此這篇關(guān)于詳解java CountDownLatch和CyclicBarrier在內(nèi)部實(shí)現(xiàn)和場(chǎng)景上的區(qū)別的文章就介紹到這了,更多相關(guān)java CountDownLatch和CyclicBarrier區(qū)別內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java中復(fù)雜的Synchronized關(guān)鍵字使用方法詳解
Synchronized關(guān)鍵字是一個(gè)種鎖,其有很多名字,例如重量級(jí)鎖、悲觀鎖、可重入鎖、、非公平、對(duì)象鎖等等,這篇文章主要給大家介紹了關(guān)于Java中復(fù)雜的Synchronized關(guān)鍵字使用方法的相關(guān)資料,需要的朋友可以參考下2024-01-01
Java RocketMQ 路由注冊(cè)與刪除的實(shí)現(xiàn)
這篇文章主要介紹了Java RocketMQ 路由注冊(cè)與刪除的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-11-11
Spring 開(kāi)發(fā)之組件賦值的實(shí)現(xiàn)方法
這篇文章主要介紹了Spring 開(kāi)發(fā)之組件賦值的實(shí)現(xiàn)方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09
mybatis連接數(shù)據(jù)庫(kù)實(shí)現(xiàn)雙表查詢(xún)
本文主要介紹了mybatis連接數(shù)據(jù)庫(kù)實(shí)現(xiàn)雙表查詢(xún),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2024-09-09
使用Feign設(shè)置Token鑒權(quán)調(diào)用接口
這篇文章主要介紹了使用Feign設(shè)置Token鑒權(quán)調(diào)用接口,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03
Java?輪詢(xún)鎖使用時(shí)遇到問(wèn)題解決方案
這篇文章主要介紹了Java?輪詢(xún)鎖使用時(shí)遇到問(wèn)題解決方案,當(dāng)我們遇到死鎖之后,除了可以手動(dòng)重啟程序解決之外,還可以考慮使用順序鎖和輪詢(xún)鎖,但是過(guò)程也會(huì)遇到一些問(wèn)題,接下來(lái)我們一起進(jìn)入下面文章了解解決方案,需要的小伙伴可以參考一下2022-05-05
java圖論弗洛伊德和迪杰斯特拉算法解決最短路徑問(wèn)題
這篇文章主要為大家介紹了java圖論弗洛伊德算法和迪杰斯特拉算法解決最短路徑的問(wèn)題示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助2021-11-11
java 流操作對(duì)文件的分割和合并的實(shí)例詳解
這篇文章主要介紹了java 流操作對(duì)文件的分割和合并的實(shí)例詳解的相關(guān)資料,希望通過(guò)本文能幫助到大家,讓大家理解掌握這部分內(nèi)容,需要的朋友可以參考下2017-10-10
Druid連接池未關(guān)閉導(dǎo)致內(nèi)存泄漏問(wèn)題
這篇文章主要介紹了Druid連接池未關(guān)閉導(dǎo)致內(nèi)存泄漏問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-12-12

