詳解Java 信號量Semaphore
Semaphore也是一個同步器,和前面兩篇說的CountDownLatch和CyclicBarrier不同,這是遞增的,初始化的時候可以指定一個值,但是不需要知道需要同步的線程個數(shù),只需要在同步的地方調(diào)用acquire方法時指定需要同步的線程個數(shù);
一.簡單使用
同步兩個子線程,只有其中兩個子線程執(zhí)行完畢,主線程才會執(zhí)行:
package com.example.demo.study;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class Study0217 {
//創(chuàng)建一個信號量的實例,信號量初始值為0
static Semaphore semaphore = new Semaphore(0);
public static void main(String[] args) throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(3);
pool.submit(()->{
System.out.println("Thread1---start");
//信號量加一
semaphore.release();
});
pool.submit(()->{
System.out.println("Thread2---start");
//信號量加一
semaphore.release();
});
pool.submit(()->{
System.out.println("Thread3---start");
//信號量加一
semaphore.release();
});
//等待兩個子線程執(zhí)行完畢就放過,必須要信號量等于2才放過
semaphore.acquire(2);
System.out.println("兩個子線程執(zhí)行完畢");
//關(guān)閉線程池,正在執(zhí)行的任務(wù)繼續(xù)執(zhí)行
pool.shutdown();
}
}

這個信號量也可以復用,類似CyclicBarrier:
package com.example.demo.study;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class Study0217 {
//創(chuàng)建一個信號量的實例,信號量初始值為0
static Semaphore semaphore = new Semaphore(0);
public static void main(String[] args) throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(3);
pool.submit(()->{
System.out.println("Thread1---start");
//信號量加一
semaphore.release();
});
pool.submit(()->{
System.out.println("Thread2---start");
//信號量加一
semaphore.release();
});
//等待兩個子線程執(zhí)行完畢就放過,必須要信號量等于2才放過
semaphore.acquire(2);
System.out.println("子線程1,2執(zhí)行完畢");
pool.submit(()->{
System.out.println("Thread3---start");
//信號量加一
semaphore.release();
});
pool.submit(()->{
System.out.println("Thread4---start");
//信號量加一
semaphore.release();
});
semaphore.acquire(2);
System.out.println("子線程3,4執(zhí)行完畢");
//關(guān)閉線程池,正在執(zhí)行的任務(wù)繼續(xù)執(zhí)行
pool.shutdown();
}
}

二.信號量原理
看看下面這個圖,可以知道信號量Semaphore還是根據(jù)AQS實現(xiàn)的,內(nèi)部有個Sync工具類操作AQS,還分為公平策略和非公平策略;

構(gòu)造器:
//默認是非公平策略
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
//可以根據(jù)第二個參數(shù)選擇是公平策略還是非公平策略
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
acquire(int permits)方法:
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
//AQS中的方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
//這里根據(jù)子類是公平策略還是非公平策略
if (tryAcquireShared(arg) < 0)
//獲取失敗會進入這里,將線程放入阻塞隊列,然后再嘗試,還是失敗的話就調(diào)用park方法掛起當前線程
doAcquireSharedInterruptibly(arg);
}
//非公平策略
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
//一個無限循環(huán),獲取state剩余的信號量,因為每調(diào)用一次release()方法的話,信號量就會加一,這里將
//最新的信號量減去傳進來的參數(shù)比較,比如有兩個線程,其中一個線程已經(jīng)調(diào)用了release方法,然后調(diào)用acquire(2)方法,那么
//這里remaining的值就是-1,返回-1,然后當前線程就會被丟到阻塞隊列中去了;如果另外一個線程也調(diào)用了release方法,
//那么此時的remaining==0,所以在這里的if中會調(diào)用CAS將0設(shè)置到state
//
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}
//公平策略
//和上面非公平差不多,只不過這里會查看阻塞隊列中當前節(jié)點前面有沒有前驅(qū)節(jié)點,有的話直接返回-1,
//就會把當前線程丟到阻塞隊列中阻塞去了,沒有前驅(qū)節(jié)點的話,就跟非公平模式一樣的了
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||compareAndSetState(available, remaining))
return remaining;
}
}
再看看release(int permits)方法:
//這個方法的作用就是將信號量加一
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
//AQS中方法
public final boolean releaseShared(int arg) {
//tryReleaseShared嘗試釋放資源
if (tryReleaseShared(arg)) {
//釋放資源成功就調(diào)用park方法喚醒喚醒AQS隊列中最前面的節(jié)點中的線程
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
//一個無限循環(huán),獲取state,然后加上傳進去的參數(shù),如果新的state的值小于舊的state,說明已經(jīng)超過了state的最大值,溢出了
//沒有溢出的話,就用CAS更新state的值
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//ws==Node.SIGNAL表示節(jié)點中線程需要被喚醒
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//調(diào)用阻塞隊列中線程的unpark方法喚醒線程
unparkSuccessor(h);
}
//ws == 0表示節(jié)點中線程是初始狀態(tài)
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
以最上面的例子簡單說一下,其實不是很難,首先線程1和線程2分別去調(diào)用release方法,這個方法里面會將AQS中的state加一,但是在執(zhí)行這個操作之前,主線程肯定會先到acquire(2),在這個方法里面,假如默認使用非公平策略,首先獲取當前的信號量state(state的初始值是0),用當前信號量減去2,如果小于0,那么當前主線程就會丟到AQS隊列中阻塞;
這個時候線程1的release方法執(zhí)行了,于是就把信號量state加一(此時state==1),CAS更新state為一,成功的話,就調(diào)用doReleaseShared()方法喚醒AQS阻塞隊列中最先掛起的線程(這里就是因為調(diào)用acquire方法而阻塞的主線程),主線程喚醒之后又會去獲取最新的信號量,與2比較,發(fā)現(xiàn)還是小于0,于是又會阻塞;
線程2此時的release方法執(zhí)行完成,重復線程一的操作,主線程喚醒之后(此時state==2),又去獲取最新的信號量發(fā)現(xiàn)是2,減去acquire方法的參數(shù)2等于0,于是就用CAS更新state的值,然后acquire方法也就執(zhí)行完畢,主線程繼續(xù)執(zhí)行后面的代碼;
其實信號量還是很有意思的,記得在項目里,有人利用信號量實現(xiàn)了一個故障隔離,什么時候我可以把整理之后的代碼貼出來分享一下,還是很有意思的,就跟springcloud的熔斷機制差不多,場景是:比如你在service的一個方法調(diào)用第三方的接口,你不知道調(diào)不調(diào)得通,而且你不希望每次前端過來都會去調(diào)用,比如當調(diào)用失敗的次數(shù)超過100次,那么五分鐘之后才會再去實際調(diào)用這個第三方服務(wù)!這五分鐘內(nèi)前調(diào)用這個服務(wù),就會觸發(fā)我們這個故障隔離的機制,向前端返回一個特定的錯誤碼和錯誤信息!
以上就是詳解Java 信號量Semaphore的詳細內(nèi)容,更多關(guān)于Java 信號量Semaphore的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
如何使用java給局域網(wǎng)的電腦發(fā)送開機數(shù)據(jù)包
這篇文章主要為大家詳細介紹了如何使用java給局域網(wǎng)的電腦發(fā)送開機數(shù)據(jù)包,文中的示例代碼講解詳細,感興趣的小伙伴可以跟隨小編一起學習一下2025-09-09
在Java中基于Geotools對PostGIS數(shù)據(jù)庫的空間查詢實踐教程
本文將深入探討這一實踐,從連接配置到復雜空間查詢操作,包括點查詢、區(qū)域范圍查詢以及空間關(guān)系判斷等,全方位展示如何在 Java 環(huán)境下借助 Geotools 駕馭 PostGIS 數(shù)據(jù)庫,實現(xiàn)高效精準的空間數(shù)據(jù)檢索,為相關(guān)領(lǐng)域開發(fā)者提供實用的技術(shù)路徑,助力空間數(shù)據(jù)應(yīng)用的創(chuàng)新拓展2025-05-05
Eclipse+Maven構(gòu)建Hadoop項目的方法步驟
這篇文章主要介紹了Eclipse+Maven構(gòu)建Hadoop項目的方法步驟,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-02-02
SpringBoot快速通關(guān)自動配置應(yīng)用
在進行項目編寫前,我們還需要知道一個東西,就是SpringBoot對我們的SpringMVC還做了哪些配置,包括如何擴展,如何定制,只有把這些都搞清楚了,我們在之后使用才會更加得心應(yīng)手2022-07-07
一次線上websocket返回400問題排查的實戰(zhàn)記錄
最近項目中有端對端通信場景,實時性要求較高,考慮后選用了websocket 這一通信協(xié)議,下面這篇文章主要給大家介紹了一次線上websocket返回400問題排查的實戰(zhàn)記錄,需要的朋友可以參考下2022-04-04
關(guān)于java.lang.IllegalArgumentException異常的正確解決方法
java.lang.IllegalArgumentException 是 Java 編程語言中的一個運行時異常,通常表示向方法傳遞了一個不合法或不適當?shù)膮?shù),本文給大家分享有關(guān)java.lang.IllegalArgumentException異常的正確解決方法,感興趣的朋友跟隨小編一起看看吧2025-09-09

