詳解Java并發(fā)工具類之CountDownLatch和CyclicBarrier
在JDK的并發(fā)包中,有幾個非常有用的并發(fā)工具類,它們分別是:CountDownLatch、CyclicBarrier、Semaphore和Exchanger。
CountDownLatch(倒計時門閂):它允許一個或多個線程等待其他線程完成操作后再繼續(xù)執(zhí)行。它通過一個計數(shù)器來實現(xiàn),線程通過調用countDown()方法來減少計數(shù)器的值,await()方法進行阻塞等待計數(shù)器減少,當計數(shù)器達到零時,等待的線程將被釋放。CyclicBarrier(循環(huán)屏障):它允許一組線程互相等待,直到到達一個共同的屏障點,然后繼續(xù)執(zhí)行后續(xù)操作。與CountDownLatch不同的是,CyclicBarrier的計數(shù)器可以重復使用(reset()方法),當所有等待線程都到達屏障點后,計數(shù)器會重置,線程可以繼續(xù)下一次等待。Semaphore(信號量):它用于控制對某個資源的訪問權限。Semaphore維護了一組許可證,線程在訪問資源前需要獲取許可證,如果許可證不可用,則線程必須等待,直到有可用的許可證。Exchanger(交換器):它提供了一種線程間交換數(shù)據(jù)的機制。兩個線程可以通過Exchanger交換數(shù)據(jù),當兩個線程都調用exchange()方法后,他們會彼此交換數(shù)據(jù),并繼續(xù)執(zhí)行后續(xù)操作。
CountDownLatch
Latch(門閂)設計模式
當多個線程并發(fā)執(zhí)行任務,然后只有等待所有子任務全部完成進行匯總,程序的門閂才能打開讓程序繼續(xù)往下執(zhí)行。它指定了一個屏障,只有所有條件都滿足的時候,門閥才能打開。
比如小明和小紅相約周末去爬山,約定在人民廣場碰頭,然后一同出發(fā)去爬山,他們各自從家里出發(fā),無論是其中某一個先到達了人民廣場都要等待另一個到達之后才可以繼續(xù)進行下去,這里的人民廣場碰頭就相當于上述的門閂。

示例
還是使用上面的例子,我們模擬小明和小紅從家出發(fā),設定不同的等待時間模擬到達人民廣場的路程耗時。代碼如下
public static void main(String[] args) throws InterruptedException, ExecutionException {
? ? ? ?final int threadNum = 2;
? ? ? ?ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
? ? ? ?CountDownLatch countDownLatch = new CountDownLatch(threadNum);
?
? ? ? ?executorService.execute(() -> {
?
? ? ? ? ? ?System.out.println("小明開始出發(fā)");
? ? ? ? ? ?try {
? ? ? ? ? ? ? ?TimeUnit.SECONDS.sleep(2);
? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ?e.printStackTrace();
? ? ? ? ? }
? ? ? ? ? ?countDownLatch.countDown(); ?// 計數(shù)器 -1
? ? ? ? ? ?System.out.println("小明到達人民廣場");
?
?
? ? ? });
? ? ? ?executorService.execute(() -> {
?
? ? ? ? ? ?System.out.println("小紅開始出發(fā)");
? ? ? ? ? ?try {
? ? ? ? ? ? ? ?TimeUnit.SECONDS.sleep(3);
? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ?e.printStackTrace();
? ? ? ? ? }
? ? ? ? ? ?countDownLatch.countDown(); ?// 計數(shù)器 -1
? ? ? ? ? ?System.out.println("小紅到達人民廣場");
?
?
? ? ? });
? ? ? ?countDownLatch.await();
? ? ? ?System.out.println("小明和小紅都到達了人民廣場,開始一起出發(fā)去爬山");
? ? ? ?executorService.shutdown();
?
? }
?結果
小明開始出發(fā)
小紅開始出發(fā)
小明到達人民廣場 // 2s后打印
小紅到達人民廣場 // 3s后打印
小明和小紅都到達了人民廣場,開始一起出發(fā)去爬山 //3s后打印
?
Process finished with exit code 0
與Join()的區(qū)別
可能這里會有疑問,使用Thread.join()也可以實現(xiàn)相同的功能,這與使用CountDownLatch有什么區(qū)別呢?
join()的實現(xiàn)
public static void main(String[] args) throws InterruptedException {
? ?Thread thread1 = new Thread(() -> {
? ? ? ?System.out.println("小明開始出發(fā)");
? ? ? ?try {
? ? ? ? ? ?TimeUnit.SECONDS.sleep(2);
? ? ? } catch (InterruptedException e) {
? ? ? ? ? ?e.printStackTrace();
? ? ? }
? ? ? ?System.out.println("小明到達人民廣場");
?
? }, "thread1");
? ?Thread thread2 = new Thread(() -> {
? ? ? ?System.out.println("小紅開始出發(fā)");
? ? ? ?try {
? ? ? ? ? ?TimeUnit.SECONDS.sleep(3);
? ? ? } catch (InterruptedException e) {
? ? ? ? ? ?e.printStackTrace();
? ? ? }
? ? ? ?System.out.println("小紅到達人民廣場");
?
? }, "thread2");
?
? ?thread1.start();
? ?thread2.start();
? ?thread1.join();
? ?thread2.join();
? ?System.out.println("小明和小紅都到達了人民廣場,開始一起出發(fā)去爬山");
?
}結果
小明開始出發(fā)
小紅開始出發(fā)
小明到達人民廣場 // 2s后打印
小紅到達人民廣場 // 3s后打印
小明和小紅都到達了人民廣場,開始一起出發(fā)去爬山 //3s后打印
?
Process finished with exit code 0
發(fā)現(xiàn)使用join()實現(xiàn)和countDownCatch實現(xiàn)好像在代碼上的體現(xiàn)并沒有太大差異,不急,我們接著往下看
join()實現(xiàn)原理
我們點進去join的jdk源碼查看它的實現(xiàn)邏輯
public final void join() throws InterruptedException {
? ?join(0);
}
?
public final synchronized void join(long millis)
? ?throws InterruptedException {
? ? ? ?long base = System.currentTimeMillis();
? ? ? ?long now = 0;
?
? ? ? ?if (millis < 0) {
? ? ? ? ? ?throw new IllegalArgumentException("timeout value is negative");
? ? ? }
// 調用join真正執(zhí)行的方法
? ? ? ?if (millis == 0) {
? ? ? ? ? ?while (isAlive()) {
? ? ? ? ? ? ? ?wait(0);
? ? ? ? ? }
? ? ? } else {
? ? ? ? ? ?while (isAlive()) {
? ? ? ? ? ? ? ?long delay = millis - now;
? ? ? ? ? ? ? ?if (delay <= 0) {
? ? ? ? ? ? ? ? ? ?break;
? ? ? ? ? ? ? }
? ? ? ? ? ? ? ?wait(delay);
? ? ? ? ? ? ? ?now = System.currentTimeMillis() - base;
? ? ? ? ? }
? ? ? }
? }
?我們看到他的核心代碼就幾行
while (isAlive()) {
? ? wait(0);
}這幾行代碼不難理解,通過不停的檢查join線程是否存活,如果線程狀態(tài)是活動的,那么就一直等待下去(wait(0)表示永久等待),直到join線程中止后,線程的this.notifyAll()方法會被調用,不過調用notifyAll()方法是在JVM里 實現(xiàn)的,所以在JDK里看不到。
Join()與countDownLatch比較
回到上一個問題,join到底和countDownLatch有什么區(qū)別,countDownLatch底層使用了計數(shù)器來控制線程的喚醒,提供了更細粒度的線程控制,比如我們運行了100個線程,但是只需要80個線程執(zhí)行結束就可以繼續(xù)下去,那么使用join就不合適了。
綜上所述 CountDownLatch相對于Join的優(yōu)勢:
CountDownLatch可以等待多個線程的完成,而Join只能等待一個線程。CountDownLatch可以靈活地設置計數(shù)器的值,不僅僅限于線程數(shù),可以根據(jù)需要自由控制。CountDownLatch提供了更細粒度的線程間協(xié)作和控制,可以在任意位置進行countDown()和await()的調用,更靈活地控制線程的流程。
CountDownLatch程序實現(xiàn)
上面說了很多CountDownLatch的示例和與join比較,也提了一下CountDownLatch底層的原理,下面就看一下如何實現(xiàn)一個簡單的CountDownLatch
程序
我們先新建一個抽象類,包含countDownLatch需要的參數(shù)和方法
public abstract class Latch {
? ?// 控制了多少線程完成后門閥才能打開
? ?protected int limit;
?
? ?// 構造函數(shù)
? ?public Latch(int limit){
? ? ? ?this.limit = limit;
? }
?
? ?// 方法使得線程一直等待
? ?public abstract void await() throws InterruptedException;
?
? ?// 當前任務線程完成工作之后調用該方法使得計數(shù)器減一
? ?public abstract void countDown();
?
? ?// 獲取當前還有多少個線程沒有完成任務
? ?public abstract int getUnArrived();
?
}然后實現(xiàn)這個抽象類,并寫入具體邏輯代碼
public class CountDownLatch extends Latch {
?
? ?private final Lock lock = new ReentrantLock();
? ?private final Condition condition = lock.newCondition();
?
?
? ?public CountDownLatch(int limit) {
? ? ? ?super(limit);
? }
?
? ?@Override
? ?public void await() throws InterruptedException {
? ? ? ?lock.lock();
? ? ? ?while (limit > 0){
? ? ? ? ? ?condition.await();
? ? ? }
? ? ? ?lock.unlock();
? }
?
? ?@Override
? ?public void countDown() {
?
? ? ? ?lock.lock();
? ? ? ?if(limit < 0){
? ? ? ? ? ?throw new IllegalStateException();
? ? ? }
? ? ? ?limit--;
? ? ? ?condition.signalAll();
?
? ? ? ?lock.unlock();
?
? }
?
? ?@Override
? ?public int getUnArrived() {
? ? ? ?return limit;
? }
}測試
public class LatchDemo {
? ?public static void main(String[] args) throws InterruptedException {
? ? ? ?Latch latch = new CountDownLatch(2);
?
? ? ? ?ExecutorService executorService = Executors.newFixedThreadPool(2);
?
? ? ? ?executorService.execute(()->{
? ? ? ? ? ?System.out.println("小明開始出發(fā)");
? ? ? ? ? ?try {
? ? ? ? ? ? ? ?TimeUnit.SECONDS.sleep(2);
? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ?e.printStackTrace();
? ? ? ? ? }
? ? ? ? ? ?latch.countDown(); ?// 計數(shù)器 -1
? ? ? ? ? ?System.out.println("小明到達人民廣場");
? ? ? });
? ? ? ?executorService.execute(()->{
? ? ? ? ? ?System.out.println("小紅開始出發(fā)");
? ? ? ? ? ?try {
? ? ? ? ? ? ? ?TimeUnit.SECONDS.sleep(3);
? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ?e.printStackTrace();
? ? ? ? ? }
? ? ? ? ? ?latch.countDown(); ?// 計數(shù)器 -1
? ? ? ? ? ?System.out.println("小紅到達人民廣場");
? ? ? });
?
?
? ? ? ?latch.await();
? ? ? ?System.out.println("小明和小紅都到達了人民廣場,開始一起出發(fā)去爬山");
?
? ? ? ?executorService.shutdown();
?
? }
}結果
小明開始出發(fā)
小紅開始出發(fā)
小明到達人民廣場
小紅到達人民廣場
小明和小紅都到達了人民廣場,開始一起出發(fā)去爬山
?
Process finished with exit code 0
可以看到結果如前文一致,這就實現(xiàn)了一個簡單的CountDownLatch,當然具體實現(xiàn)還有更多的細節(jié),如有需要,請翻閱源碼。
總結
通過上面的簡單實現(xiàn),我們可以看到CountDownLatch基于計數(shù)器實現(xiàn)了多線程之間的門閥攔截,底層還是通過線程之間的通訊、鎖和計數(shù)器控制。
CyclicBarrier
除了使用CountDownLatch來實現(xiàn)多線程之間的阻塞同步,也可以使用CyclicBarrier來實現(xiàn),并且CyclicBarrier提供了比CountDownLatch更強大的功能。
CyclicBarrier的字面意思是可循環(huán)使用的屏障。它提供了一種同步機制,使一組線程能夠在達到屏障時被阻塞,直到最后一個線程到達屏障時,屏障才會開啟,所有被阻塞的線程才能繼續(xù)執(zhí)行。
網上找的一張示意圖

示例
還是用之前的例子,模擬小明和小紅去爬山,代碼如下,結果就不贅述了。
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
?
? ?CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
?
? ?ExecutorService executorService = Executors.newFixedThreadPool(2);
?
? ?executorService.execute(()->{
? ? ? ?System.out.println("小明開始出發(fā)");
? ? ? ?try {
? ? ? ? ? ?TimeUnit.SECONDS.sleep(2);
? ? ? ? ? ?System.out.println("小明到達人民廣場");
? ? ? ? ? ?cyclicBarrier.await(); // 計數(shù)器 -1
? ? ? } catch (Exception e) {
? ? ? ? ? ?e.printStackTrace();
? ? ? }
?
? });
? ?executorService.execute(()->{
? ? ? ?System.out.println("小紅開始出發(fā)");
? ? ? ?try {
? ? ? ? ? ?TimeUnit.SECONDS.sleep(3);
? ? ? ? ? ?System.out.println("小紅到達人民廣場");
? ? ? ? ? ?cyclicBarrier.await(); // 計數(shù)器 -1
? ? ? } catch (Exception e) {
? ? ? ? ? ?e.printStackTrace();
? ? ? }
? });
? ?cyclicBarrier.await();
? ?System.out.println("小明和小紅都到達了人民廣場,開始一起出發(fā)去爬山");
?
? ?executorService.shutdown();
?
}不同的是,這里我設置了三個屏障點 cyclicBarrier.await();,而使用CountDownLatch只用了兩個計數(shù)器減一操作 + 一個wait()方法,使用起來很相似,我們說cyclicBarrier 比 CountDownLatch 功能更強大,那么強大在哪里呢?
重置計數(shù)器和獲取狀態(tài)
重置計數(shù)器
CountDownLatch的計數(shù)器只能使用一次,而CyclicBarrier的計數(shù)器可以使用reset()方法重置。所以CyclicBarrier能處理更為復雜的業(yè)務場景。例如,如果計算發(fā)生錯誤,可以重置計數(shù)器,并讓線程重新執(zhí)行一次。
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
?
? ?final int threadNum = 3;
? ?ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
? ?CyclicBarrier cyclicBarrier = new CyclicBarrier(threadNum, () -> {
? ? ? ?System.out.println("所有線程都到達屏障");
? });
?
? ?for (int i = 0; i < threadNum; i++) {
? ? ? ?executorService.execute(() -> {
? ? ? ? ? ?System.out.println(Thread.currentThread().getName() + " 到達屏障");
? ? ? ? ? ?try {
? ? ? ? ? ? ? ?cyclicBarrier.await();
? ? ? ? ? } catch (InterruptedException | BrokenBarrierException e) {
? ? ? ? ? ? ? ?e.printStackTrace();
? ? ? ? ? }
? ? ? });
? }
?
? ?Thread.sleep(2000); // 等待一段時間,確保所有線程都到達屏障
?
? ?cyclicBarrier.reset(); // 重置屏障
?
? ?System.out.println("屏障已重置");
?
? ?for (int i = 0; i < threadNum-1; i++) {
? ? ? ?executorService.execute(() -> {
? ? ? ? ? ?try {
? ? ? ? ? ? ? ?TimeUnit.SECONDS.sleep(1);
? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ?e.printStackTrace();
? ? ? ? ? }
? ? ? ? ? ?System.out.println(Thread.currentThread().getName() + " 到達屏障");
? ? ? ? ? ?try {
? ? ? ? ? ? ? ?cyclicBarrier.await();
? ? ? ? ? } catch (InterruptedException | BrokenBarrierException e) {
? ? ? ? ? ? ? ?e.printStackTrace();
? ? ? ? ? }
? ? ? });
? }
?
? ?System.out.println("第二次進入 循環(huán)屏障");
? ?cyclicBarrier.await();
? ?System.out.println("第二次循環(huán) 邁過屏障");
?
?
? ?executorService.shutdown();
?
?
}結果
pool-1-thread-2 到達屏障
pool-1-thread-3 到達屏障
pool-1-thread-1 到達屏障
所有線程都到達屏障
屏障已重置
第二次進入 循環(huán)屏障
pool-1-thread-2 到達屏障
pool-1-thread-1 到達屏障
所有線程都到達屏障
第二次循環(huán) 邁過屏障
?
Process finished with exit code 0
先說一下CyclicBarrier提供的另一個構造函數(shù)CyclicBarrier(int parties,Runnable barrierAction),用于在線程到達屏障時,優(yōu)先執(zhí)行barrierAction,也就是上方代碼中用到的這幾段
CyclicBarrier cyclicBarrier = new CyclicBarrier(threadNum, () -> {
? ?System.out.println("所有線程都到達屏障");
});這里用于提示所有的線程到達屏障。緊接著是比較常規(guī)的代碼,循環(huán)構造線程并在線程中執(zhí)行了 cyclicBarrier.await();到達屏障。重點是 cyclicBarrier.reset();重置屏障后,我留下一個屏障給主線程測試使用,而在新構造的線程中停留1s, System.out.println("第二次循環(huán) 邁過屏障");打印在 System.out.println(Thread.currentThread().getName() + " 到達屏障");之后,說明屏障計數(shù)器已經重置并且生效了。
獲取狀態(tài)
除了上述的基本功能外,CyclicBarrier也提供了以下API用來查看狀態(tài),
getNumberWaiting() // 顧名思義,獲取目前正在屏障處阻塞等待的線程數(shù)量。getParties() // 獲取屏障數(shù)量 也就是我們傳入構造函數(shù)中的parties參數(shù)isBroken() // 查詢阻塞的線程是否被中斷
以上就是詳解Java并發(fā)工具類之CountDownLatch和CyclicBarrier的詳細內容,更多關于Java CountDownLatch CyclicBarrier的資料請關注腳本之家其它相關文章!
相關文章
線程池滿Thread?pool?exhausted排查和解決方案
這篇文章主要介紹了線程池滿Thread?pool?exhausted排查和解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-11-11
Spring裝配Bean之用Java代碼安裝配置bean詳解
這篇文章主要給大家介紹了關于Spring裝配Bean之用Java代碼安裝配置bean的相關資料,文中通過示例代碼介紹的非常詳細,對大家學習或者使用spring具有一定的參考學習價值,需要的朋友們下面來一起學習學習吧。2017-10-10
Java Map如何根據(jù)key取value以及不指定key取出所有的value
這篇文章主要介紹了Java Map如何根據(jù)key取value以及不指定key取出所有的value,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-09-09
關于springcloud報錯報UnsatisfiedDependencyException的問題
這篇文章主要介紹了關于springcloud報錯報UnsatisfiedDependencyException的問題,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-11-11
java 使用URLDecoder和URLEncoder對中文進行處理
這篇文章主要介紹了java 使用URLDecoder和URLEncoder對中文進行處理的相關資料,需要的朋友可以參考下2017-02-02

