Java并發(fā)編程之Condition源碼分析(推薦)
Condition介紹
上篇文章講了ReentrantLock的加鎖和釋放鎖的使用,這篇文章是對ReentrantLock的補(bǔ)充。ReentrantLock#newCondition()可以創(chuàng)建Condition,在ReentrantLock加鎖過程中可以利用Condition阻塞當(dāng)前線程并臨時(shí)釋放鎖,待另外線程獲取到鎖并在邏輯后通知阻塞線程"激活"。Condition常用在基于異步通信的同步機(jī)制實(shí)現(xiàn)中,比如dubbo中的請求和獲取應(yīng)答結(jié)果的實(shí)現(xiàn)。
常用方法
Condition中主要的方法有2個(gè)
- (1)await()方法可以阻塞當(dāng)前線程,并釋放鎖。
- (2)在獲取鎖后可以調(diào)用signal()通知被await()阻塞的線程"激活"。
這里的await(),signal()必須在ReentrantLock#lock()和ReentrantLock#unlock()之間調(diào)用。
Condition實(shí)現(xiàn)分析
Condition的實(shí)現(xiàn)也是利用AbstractQueuedSynchronizer隊(duì)列來實(shí)現(xiàn),await()在被調(diào)用后先將當(dāng)前線程加入到等待隊(duì)列中,然后釋放鎖,最后阻塞當(dāng)前線程。signal()在被調(diào)用后會(huì)先獲取等待隊(duì)列中第一個(gè)節(jié)點(diǎn),并將這個(gè)節(jié)點(diǎn)轉(zhuǎn)化成ReentrantLock中的節(jié)點(diǎn)并加入到同步阻塞隊(duì)列的結(jié)尾,這樣此節(jié)點(diǎn)的上個(gè)節(jié)點(diǎn)線程釋放鎖后會(huì)激活此節(jié)點(diǎn)線程取來獲取鎖。
await()方法源碼分析
await()源碼如下
public final void await() throws InterruptedException {
//判斷是否當(dāng)前線程是否被中斷中斷則拋出中斷異常
if (Thread.interrupted())
throw new InterruptedException();
//加入等待隊(duì)列
Node node = addConditionWaiter();
//釋放當(dāng)前線程鎖
int savedState = fullyRelease(node);
int interruptMode = 0;
//判斷是否在同步阻塞隊(duì)列,如果不在一直循環(huán)到被加入
while (!isOnSyncQueue(node)) {
//阻塞當(dāng)前線程
LockSupport.park(this);
//判斷是否被中斷
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//獲取鎖,如果獲取中被中斷則設(shè)置中斷狀態(tài)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//清除等待隊(duì)列中被"激活"的節(jié)點(diǎn)
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
//如果當(dāng)前線程被中斷,處理中斷邏輯
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
主要分以下幾步
- (1)先判斷是否當(dāng)前線程是否被中斷中斷則拋出中斷異常如果未中斷調(diào)用addConditionWaiter()加入等待隊(duì)列
- (2)調(diào)用fullyRelease(node)釋放鎖使同步阻塞隊(duì)列的下個(gè)節(jié)點(diǎn)線程能獲取鎖。
- (3)調(diào)用isOnSyncQueue(node)判斷是否在同步阻塞隊(duì)列,這里的加入同步阻塞隊(duì)列操作是在另一個(gè)線程調(diào)用signal()后加入,如果不在同步阻塞隊(duì)列會(huì)進(jìn)行阻塞直到被激活。
- (4)如果被激活然后調(diào)用checkInterruptWhileWaiting(node)判斷是否被中斷并獲取中斷模式。
- (5)繼續(xù)調(diào)用isOnSyncQueue(node)判斷是否在同步阻塞隊(duì)列。
- (6)是則調(diào)用acquireQueued(node, savedState) 獲取鎖,這里如果獲取不到也會(huì)被阻塞,獲取不到原因是在第一次調(diào)用isOnSyncQueue(node)前,可能另一個(gè)線程已經(jīng)調(diào)用signal()后加入到同步阻塞隊(duì)列,然后調(diào)用acquireQueued(node, savedState) 獲取不到鎖并阻塞。acquireQueued(node, savedState)也會(huì)返回當(dāng)前線程是否被中斷,如果被中斷設(shè)置中斷模式。
- (7)在激活后調(diào)用unlinkCancelledWaiters()清理等待隊(duì)列的已經(jīng)被激活的節(jié)點(diǎn)。
- (8)最后判斷當(dāng)前線程是否被中斷,如果被中斷則對中斷線程做處理。
下面來看下addConditionWaiter()實(shí)現(xiàn)
private Node addConditionWaiter() {
//獲取等待隊(duì)列尾部節(jié)點(diǎn)
Node t = lastWaiter;
//如果尾部狀態(tài)不為CONDITION,如果已經(jīng)被"激活",清理之,然后重新獲取尾部節(jié)點(diǎn)
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//創(chuàng)建以當(dāng)前線程為基礎(chǔ)的節(jié)點(diǎn),并將節(jié)點(diǎn)模式設(shè)置成CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//如果尾節(jié)點(diǎn)不存在,說明隊(duì)列為空,將頭節(jié)點(diǎn)設(shè)置成當(dāng)前節(jié)點(diǎn)
if (t == null)
firstWaiter = node;
//如果尾節(jié)點(diǎn)存在,將此節(jié)點(diǎn)設(shè)置成尾節(jié)點(diǎn)的下個(gè)節(jié)點(diǎn)
else
t.nextWaiter = node;
//將尾節(jié)點(diǎn)設(shè)置成當(dāng)前節(jié)點(diǎn)
lastWaiter = node;
return node;
}
addConditionWaiter()的邏輯很簡單,就是創(chuàng)建以當(dāng)前線程為基礎(chǔ)的節(jié)點(diǎn)并把節(jié)點(diǎn)加入等待隊(duì)列的尾部待其他線程處理。
下面來看下fullyRelease(Node node)實(shí)現(xiàn)
final int fullyRelease(Node node) {
boolean failed = true;
try {
//獲取阻塞隊(duì)列中當(dāng)前線程節(jié)點(diǎn)的鎖狀態(tài)值
int savedState = getState();
//釋放當(dāng)前線程節(jié)點(diǎn)鎖
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
//釋放失敗講節(jié)點(diǎn)等待狀態(tài)設(shè)置成關(guān)閉
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
調(diào)用getState()先獲取阻塞隊(duì)列中當(dāng)前線程節(jié)點(diǎn)的鎖狀態(tài)值,這個(gè)值可能大于1表示多次重入,然后調(diào)用release(savedState)釋放所有鎖,如果釋放成功返回鎖狀態(tài)值。
下面來看下isOnSyncQueue(Node node)實(shí)現(xiàn)
final boolean isOnSyncQueue(Node node) {
//判斷當(dāng)前節(jié)點(diǎn)是否是CONDITION或者前置節(jié)點(diǎn)是否為空如果為空直接返回false
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//如果下個(gè)節(jié)點(diǎn)存在,則在同步阻塞隊(duì)列中返回true
if (node.next != null) // If has successor, it must be on queue
return true;
//遍歷查找當(dāng)前節(jié)點(diǎn)是否在同步阻塞隊(duì)列中
return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
此方法的功能是查找當(dāng)前節(jié)點(diǎn)是否在同步阻塞隊(duì)列中,方法先是快速判斷,判斷不了再進(jìn)行遍歷查找。
- (1)第一步先判斷次節(jié)點(diǎn)是否CONDITION狀態(tài)或者前置節(jié)點(diǎn)是否存在,如果是表明不在隊(duì)列中返回false,阻塞隊(duì)列中的狀態(tài)一般是0或者SIGNAL狀態(tài)而且如果當(dāng)前如果當(dāng)前節(jié)點(diǎn)在隊(duì)列阻塞中且未被激活前置節(jié)點(diǎn)一定不為空。
- (2)第二步判斷節(jié)點(diǎn)的下個(gè)節(jié)點(diǎn)是否存在,如果存在則表明當(dāng)前當(dāng)前節(jié)點(diǎn)已加入到阻塞隊(duì)列中。
- (3)如果以上2點(diǎn)都沒法判斷,也有可能剛剛加入到同步阻塞隊(duì)列中,所以調(diào)用findNodeFromTail(Node node)做最后的遍歷查找。查找從隊(duì)列尾部開始查,從尾部開始查的原因是可能剛剛加入到同步阻塞隊(duì)列中,從尾部能快速定位。
下面看下checkInterruptWhileWaiting(Node node)實(shí)現(xiàn)
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
此方法在線程被激活后被調(diào)用,主要功能就是判斷被激活的線程是否被中斷。此方法會(huì)返回2種中斷狀態(tài)THROW_IE和REINTERRUPT,THROW_IE是調(diào)用signal()前被中斷返回,REINTERRUPT在調(diào)用signal()后被中斷返回。 此方法先判斷是否被標(biāo)記中斷,是的話再調(diào)用transferAfterCancelledWait(node)取判斷是那種中斷狀態(tài),transferAfterCancelledWait(node)方法分2步
- (1)用CAS方式將節(jié)點(diǎn)狀態(tài)改錯(cuò)等待狀態(tài)改成CONDITION,并加入到同步阻塞隊(duì)列中返回true
- (2)如果不能加入到同步阻塞隊(duì)列就自旋一直等待加入
如果使用await()方法上面2步其實(shí)是沒什么作用其最后一定會(huì)返回false,因?yàn)閍wait()被激活只能調(diào)用 signal()方法,而signal()方法肯定已經(jīng)將節(jié)點(diǎn)加入到同步阻塞隊(duì)列中。所以以上邏輯是給await(long time, TimeUnit unit)等帶超時(shí)激活方法用的。
acquireQueued(node, savedState)方法再上一章節(jié)已經(jīng)講過這邊就不重復(fù)了,下面分析下unlinkCancelledWaiters()方法
private void unlinkCancelledWaiters() {
//獲取等待隊(duì)列頭節(jié)點(diǎn)
Node t = firstWaiter;
Node trail = null;
while (t != null) {
//獲取下個(gè)節(jié)點(diǎn)
Node next = t.nextWaiter;
//如果狀態(tài)不為CONDITION說明已經(jīng)加入阻塞隊(duì)列需要清理掉
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
//獲取下個(gè)節(jié)點(diǎn)
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
此方法就是從頭開始查找狀態(tài)不為CONDITION的節(jié)點(diǎn)并清理,狀態(tài)不為CONDITION節(jié)點(diǎn)說明此節(jié)點(diǎn)已經(jīng)加入到阻塞隊(duì)列,已經(jīng)不需要維護(hù)。
下面來看下reportInterruptAfterWait(interruptMode)方法
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
//如果是THROW_IE模式直接拋出異常
if (interruptMode == THROW_IE)
throw new InterruptedException();
//如果是REINTERRUPT模式標(biāo)記線程中斷由上層處理中斷
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
此方法處理中斷邏輯。如果是THROW_IE模式直接拋出異常,如果是REINTERRUPT模式標(biāo)記線程中斷由上層處理中斷。
signal()方法源碼分析
signal()源碼如下
public final void signal() {
//是否當(dāng)前線程持有鎖
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
//通知"激活"頭節(jié)點(diǎn)線程
if (first != null)
doSignal(first);
}
先調(diào)用isHeldExclusively()判斷鎖是否被當(dāng)前線程持有,然后檢查等待隊(duì)列是否為空,不為空就是可以取第一個(gè)節(jié)點(diǎn)調(diào)用doSignal(first)去"激活",這里激活不是真正的激活而只是將節(jié)點(diǎn)加入到同步阻塞隊(duì)列尾部,所以上下文中帶""的激活都是這種解釋。
下面看下isHeldExclusively()實(shí)現(xiàn)
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
實(shí)現(xiàn)就是比較下當(dāng)前線程和持有鎖的線程是否同一個(gè)
下面看下doSignal(first)的實(shí)現(xiàn)
private void doSignal(Node first) {
do {
//頭指頭后移一位,如果后面的節(jié)點(diǎn)為空,則將尾指頭也指向空,說明隊(duì)列為空了
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
//清空頭節(jié)點(diǎn)的下個(gè)節(jié)點(diǎn)
first.nextWaiter = null;
//如果"激活"失敗者取下個(gè)繼續(xù),直到成功或者遍歷完
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
此方法就是取當(dāng)前頭節(jié)點(diǎn)一直去嘗試"激活",直到成功或者遍歷完。
下面來看下transferForSignal(first)方法
final boolean transferForSignal(Node node) {
//將CONDITION狀態(tài)設(shè)置成0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//加入到同步阻塞隊(duì)列
Node p = enq(node);
int ws = p.waitStatus;
//狀態(tài)異常直接激活
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
(1)此方法先先將CONDITION狀態(tài)設(shè)置成0,因?yàn)槿绻荂ONDITION狀態(tài)加入到同步阻塞隊(duì)列,激活的時(shí)候是不識(shí)別的。
(2)加入到同步阻塞隊(duì)列的尾部。所以同步阻塞隊(duì)列中前面如果有多個(gè)在排隊(duì),調(diào)用unlock()不會(huì)馬上激活此節(jié)點(diǎn)。
(3)狀態(tài)異常直接調(diào)用unpark激活,這邊按理說如果狀態(tài)異常情況下激活,await()在調(diào)用unlock()被激活后會(huì)進(jìn)行相應(yīng)的異常處理,但看await()代碼沒有處理則是正常執(zhí)行。
這個(gè)方法主要就是把節(jié)點(diǎn)加入到同步阻塞隊(duì)列的,真正的激活則是調(diào)用unlock()去處理。
以上所述是小編給大家介紹的Java并發(fā)編程之Condition源碼分析詳解整合,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會(huì)及時(shí)回復(fù)大家的。在此也非常感謝大家對腳本之家網(wǎng)站的支持!
相關(guān)文章
SpringSecurity實(shí)現(xiàn)踢出指定用戶的示例
SpringSecurity中使用SessionRegistryImpl類可以獲取session信息并踢出用戶,這篇文章主要介紹了SpringSecurity實(shí)現(xiàn)踢出指定用戶的示例,需要的朋友可以參考下2025-03-03
Idea啟動(dòng)多個(gè)SpringBoot項(xiàng)目的3種最新方案
SpringBoot自帶Tomcat,直接運(yùn)行main方法里面的SpringApplication.run即可,并且訪問時(shí)不需要帶項(xiàng)目名,這篇文章主要介紹了Idea啟動(dòng)多個(gè)SpringBoot項(xiàng)目的3種方案,需要的朋友可以參考下2023-02-02
使用Spring Boot實(shí)現(xiàn)操作數(shù)據(jù)庫的接口的過程
本文給大家分享使用Spring Boot實(shí)現(xiàn)操作數(shù)據(jù)庫的接口的過程,包括springboot原理解析及實(shí)例代碼詳解,感興趣的朋友跟隨小編一起看看吧2021-07-07
JAVA?Springboot配置i18n國際化語言詳細(xì)步驟
國際化(Internationalization,縮寫為i18n)是指根據(jù)來展示不同的內(nèi)容,使應(yīng)用程序能夠適應(yīng)不同的語言和文化習(xí)慣,下面這篇文章主要給大家介紹了關(guān)于JAVA?Springboot配置i18n國際化語言的詳細(xì)步驟,需要的朋友可以參考下2024-08-08
Springboot打包代碼,反編譯后代碼混淆方式(防止還原代碼)
文章主要介紹了如何對Spring Boot項(xiàng)目進(jìn)行jar包混淆,以防止反編譯還原原始代碼,通過在項(xiàng)目中添加proguard.cfg文件并配置Maven插件,可以實(shí)現(xiàn)代碼混淆,從而增加反編譯的難度2024-11-11
SpringBoot中的ApplicationRunner與CommandLineRunner問題
這篇文章主要介紹了SpringBoot中的ApplicationRunner與CommandLineRunner問題,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-09-09
mybatis映射文件操作存儲(chǔ)過程的實(shí)現(xiàn)
本文主要介紹了mybatis映射文件操作存儲(chǔ)過程的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-03-03

