Java多線程之條件對象Condition
1 簡介
Condition中的await()方法相當(dāng)于Object的wait()方法,Condition中的signal()方法相當(dāng)于Object的notify()方法,Condition中的signalAll()相當(dāng)于Object的notifyAll()方法。
不同的是,Object中的wait() ,notify() ,notifyAll()方法是和"同步鎖"(synchronized關(guān)鍵字)捆綁使用的;而Condition是需要與"互斥鎖"/"共享鎖"捆綁使用的。
2 Condition的實現(xiàn)分析
Condition是同步器AbstractQueuedSynchronized的內(nèi)部類,因為Condition的操作需要獲取相關(guān)的鎖,所以作為同步器的內(nèi)部類比較合理。每個Condition對象都包含著一個隊列(等待隊列),該隊列是Condition對象實現(xiàn)等待/通知功能的關(guān)鍵。
等待隊列
等待隊列是一個FIFO的隊列,隊列的每一個節(jié)點都包含了一個線程引用,該線程就是在Condition對象上等待的線程,如果一個線程調(diào)用了await()方法,該線程就會釋放鎖、構(gòu)造成節(jié)點進入等待隊列并進入等待狀態(tài)。

這里的節(jié)點定義也就是AbstractQueuedSynchronizer.Node的定義。
一個Condition包含一個等待隊列,Condition擁有首節(jié)點(firstWaiter)和尾節(jié)點(lastWaiter)。當(dāng)前線程調(diào)用Condition.await()方法時,將會以當(dāng)前線程構(gòu)造節(jié)點,并將節(jié)點從尾部加入等待隊列。
在Object的監(jiān)視器模型上,一個對象擁有一個同步隊列和等待隊列,而Lock(同步器)擁有一個同步隊列和多個等待隊列。

等待(await):AbstractQueuedLongSynchronizer中實現(xiàn)
調(diào)用Condition的await()方法,會使當(dāng)前線程進入等待隊列并釋放鎖,同時線程狀態(tài)變?yōu)榈却隣顟B(tài)。
從隊列的角度來看,相當(dāng)于同步隊列的首節(jié)點(獲取了鎖的節(jié)點)移動到Condition的等待隊列中。
當(dāng)?shù)却犃兄械墓?jié)點被喚醒,則喚醒節(jié)點的線程開始嘗試獲取同步狀態(tài)。如果不是通過Condition.signal()方法喚醒,而是對等待線程進行中斷,則拋出InterruptedException。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
long savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
Condition等待通知的本質(zhì)
總的來說,Condition的本質(zhì)就是等待隊列和同步隊列的交互:
當(dāng)一個持有鎖的線程調(diào)用Condition.await()時,它會執(zhí)行以下步驟:
構(gòu)造一個新的等待隊列節(jié)點加入到等待隊列隊尾
釋放鎖,也就是將它的同步隊列節(jié)點從同步隊列隊首移除
自旋,直到它在等待隊列上的節(jié)點移動到了同步隊列(通過其他線程調(diào)用signal() )或被中斷
阻塞當(dāng)前節(jié)點,直到它獲取到了鎖,也就是它在同步隊列上的節(jié)點排隊排到了隊首。
當(dāng)一個持有鎖的線程調(diào)用Condition.signal()時,它會執(zhí)行以下操作:
從等待隊列的隊首開始,嘗試對隊首節(jié)點執(zhí)行喚醒操作;如果節(jié)點CANCELLED,就嘗試喚醒下一個節(jié)點;如果再CANCELLED則繼續(xù)迭代。
對每個節(jié)點執(zhí)行喚醒操作時,首先將節(jié)點加入同步隊列,此時await()操作的步驟3的解鎖條件就已經(jīng)開啟了。
然后分兩種情況討論:
如果先驅(qū)節(jié)點的狀態(tài)為CANCELLED(>0) 或設(shè)置先驅(qū)節(jié)點的狀態(tài)為SIGNAL失敗,那么就立即喚醒當(dāng)前節(jié)點對應(yīng)的線程,此時await()方法就會完成步驟3,進入步驟4.
如果成功把先驅(qū)節(jié)點的狀態(tài)設(shè)置為了SIGNAL,那么就不立即喚醒了。等到先驅(qū)節(jié)點成為同步隊列首節(jié)點并釋放了同步狀態(tài)后,會自動喚醒當(dāng)前節(jié)點對應(yīng)線程的,這時候await()的步驟3才執(zhí)行完成,而且有很大概率快速完成步驟4.
通知(signal):AbstractQueuedLongSynchronizer中實現(xiàn)
調(diào)用Condition的signal()方法,將會喚醒在等待隊列中等待時間最長的節(jié)點(首節(jié)點),在喚醒節(jié)點之前,會將節(jié)點移到同步隊列中。
Condition的signalAll()方法,相當(dāng)于對等待隊列中的每個節(jié)點均執(zhí)行一次signal()方法,將等待隊列中的節(jié)點全部移動到同步隊列中,并喚醒每個節(jié)點的線程。
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
最后還要注意,Java 中有 signal 和 signalAll 兩種方法,signal 是隨機解除一個等待集中的線程的阻塞狀態(tài),signalAll 是解除所有等待集中的線程的阻塞狀態(tài)。signal 方法的效率會比 signalAll 高,但是它存在危險,因為它一次只解除一個線程的阻塞狀態(tài),因此,如果等待集中有多個線程都滿足了條件,也只能喚醒一個,其他的線程可能會導(dǎo)致死鎖
3 Condition 實例
消費生產(chǎn)者模式:
public class ConditionTest {
public static void main(String[] args) {
// 倉庫
Depot depot = new Depot(100);
// 消費者
Consumer consumer = new Consumer(depot);
// 生產(chǎn)者
Produce produce = new Produce(depot);
produce.produceThing(5);
consumer.consumerThing(5);
produce.produceThing(2);
consumer.consumerThing(5);
produce.produceThing(3);
}
}
class Depot {
private int capacity;
private int size;
private Lock lock;
private Condition consumerCond;
private Condition produceCond;
public Depot(int capacity) {
this.capacity = capacity;
this.size = 0;
this.lock = new ReentrantLock();
this.consumerCond = lock.newCondition();
this.produceCond = lock.newCondition();
}
public void produce(int val) {
lock.lock();
try {
int left = val;
while (left > 0) {
while (size >= capacity) {
produceCond.await();
}
int produce = (left+size) > capacity ? (capacity-size) : left;
size += produce;
left -= produce;
System.out.println(Thread.currentThread().getName() + ", ProduceVal=" + val + ", produce=" + produce + ", size=" + size);
consumerCond.signalAll();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void consumer(int val) {
lock.lock();
try {
int left = val;
while (left > 0) {
while (size <= 0) {
consumerCond.await();
}
int consumer = (size <= left) ? size : left;
size -= consumer;
left -= consumer;
System.out.println(Thread.currentThread().getName() + ", ConsumerVal=" + val + ", consumer=" + consumer + ", size=" + size);
produceCond.signalAll();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
class Consumer {
private Depot depot;
public Consumer(Depot depot) {
this.depot = depot;
}
public void consumerThing(final int amount) {
new Thread(new Runnable() {
public void run() {
depot.consumer(amount);
}
}).start();
}
}
class Produce {
private Depot depot;
public Produce(Depot depot) {
this.depot = depot;
}
public void produceThing(final int amount) {
new Thread(new Runnable() {
public void run() {
depot.produce(amount);
}
}).start();
}
}
Thread-0, ProduceVal=5, produce=5, size=5
Thread-1, ConsumerVal=5, consumer=5, size=0
Thread-2, ProduceVal=2, produce=2, size=2
Thread-3, ConsumerVal=5, consumer=2, size=0
Thread-4, ProduceVal=3, produce=3, size=3
Thread-3, ConsumerVal=5, consumer=3, size=0
輸出結(jié)果中,Thread-3出現(xiàn)兩次,就是因為要消費5個產(chǎn)品,但倉庫中只有2個產(chǎn)品,所以先將庫存的2個產(chǎn)品全部消費,然后這個線程進入等待隊列,等待生產(chǎn),隨后生產(chǎn)出了3個產(chǎn)品,生產(chǎn)者生產(chǎn)后又執(zhí)行signalAll方法將等待隊列中所有的線程都喚醒,Thread-3繼續(xù)消費還需要的3個產(chǎn)品。
三個線程依次打印ABC
class Business {
private Lock lock = new ReentrantLock();
private Condition conditionA = lock.newCondition();
private Condition conditionB = lock.newCondition();
private Condition conditionC = lock.newCondition();
private String type = "A"; //內(nèi)部狀態(tài)
/*
* 方法的基本要求為:
* 1、該方法必須為原子的。
* 2、當(dāng)前狀態(tài)必須滿足條件。若不滿足,則等待;滿足,則執(zhí)行業(yè)務(wù)代碼。
* 3、業(yè)務(wù)執(zhí)行完畢后,修改狀態(tài),并喚醒指定條件下的線程。
*/
public void printA() {
lock.lock(); //鎖,保證了線程安全。
try {
while (type != "A") { //type不為A,
try {
conditionA.await(); //將當(dāng)前線程阻塞于conditionA對象上,將被阻塞。
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//type為A,則執(zhí)行。
System.out.println(Thread.currentThread().getName() + " 正在打印A");
type = "B"; //將type設(shè)置為B。
conditionB.signal(); //喚醒在等待conditionB對象上的一個線程。將信號傳遞出去。
} finally {
lock.unlock(); //解鎖
}
}
public void printB() {
lock.lock(); //鎖
try {
while (type != "B") { //type不為B,
try {
conditionB.await(); //將當(dāng)前線程阻塞于conditionB對象上,將被阻塞。
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//type為B,則執(zhí)行。
System.out.println(Thread.currentThread().getName() + " 正在打印B");
type = "C"; //將type設(shè)置為C。
conditionC.signal(); //喚醒在等待conditionC對象上的一個線程。將信號傳遞出去。
} finally {
lock.unlock(); //解鎖
}
}
public void printC() {
lock.lock(); //鎖
try {
while (type != "C") {
try {
conditionC.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + " 正在打印C");
type = "A";
conditionA.signal();
} finally {
lock.unlock(); //解鎖
}
}
}
public class ConditionTest{
public static void main(String[] args) {
final Business business = new Business();//業(yè)務(wù)對象。
//線程1號,打印10次A。
Thread ta = new Thread(new Runnable() {
@Override
public void run() {
for(int i=0;i<10;i++){
business.printA();
}
}
});
//線程2號,打印10次B。
Thread tb = new Thread(new Runnable() {
@Override
public void run() {
for(int i=0;i<10;i++){
business.printB();
}
}
});
//線程3號,打印10次C。
Thread tc = new Thread(new Runnable() {
@Override
public void run() {
for(int i=0;i<10;i++){
business.printC();
}
}
});
//執(zhí)行3條線程。
ta.start();
tb.start();
tc.start();
}
}
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
虛假喚醒
所謂"虛假喚醒",即其他地方的代碼觸發(fā)了condition.signal() ,喚醒condition上等待的線程。但被喚醒的線程仍然不滿足執(zhí)行條件。
condition通常與條件語句一起使用:
if(!條件){
condition.await(); //不滿足條件,當(dāng)前線程等待;
}
更好的方法是使用while:
while(!條件){
condition.await(); //不滿足條件,當(dāng)前線程等待;
}
在等待Condition時,允許發(fā)生"虛假喚醒",這通常作為對基礎(chǔ)平臺語義的讓步。若使用"if(!條件)"則被"虛假喚醒"的線程可能繼續(xù)執(zhí)行。所以"while(!條件)"可以防止"虛假喚醒"。建議總是假定這些"虛假喚醒"可能發(fā)生,因此總是在一個循環(huán)中等待。
4、總結(jié)
如果知道Object的等待通知機制,Condition的使用是比較容易掌握的,因為和Object等待通知的使用基本一致。
對Condition的源碼理解,主要就是理解等待隊列,等待隊列可以類比同步隊列,而且等待隊列比同步隊列要簡單,因為等待隊列是單向隊列,同步隊列是雙向隊列。
以下是筆者對等待隊列是單向隊列、同步隊列是雙向隊列的一些思考,歡迎提出不同意見:
之所以同步隊列要設(shè)計成雙向的,是因為在同步隊列中,節(jié)點喚醒是接力式的,由每一個節(jié)點喚醒它的下一個節(jié)點,如果是由next指針獲取下一個節(jié)點,是有可能獲取失敗的,因為虛擬隊列每添加一個節(jié)點,是先用CAS把tail設(shè)置為新節(jié)點,然后才修改原tail的next指針到新節(jié)點的。因此用next向后遍歷是不安全的,但是如果在設(shè)置新節(jié)點為tail前,為新節(jié)點設(shè)置prev,則可以保證從tail往前遍歷是安全的。因此要安全的獲取一個節(jié)點Node的下一個節(jié)點,先要看next是不是null,如果是null,還要從tail往前遍歷看看能不能遍歷到Node。
而等待隊列就簡單多了,等待的線程就是等待者,只負責(zé)等待,喚醒的線程就是喚醒者,只負責(zé)喚醒,因此每次要執(zhí)行喚醒操作的時候,直接喚醒等待隊列的首節(jié)點就行了。等待隊列的實現(xiàn)中不需要遍歷隊列,因此也不需要prev指針。
到此這篇關(guān)于Java多線程之條件對象Condition的文章就介紹到這了,更多相關(guān)Java多線程 Condition內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java中處理金額計算之使用Long還是BigDecimal詳解
在Java后端開發(fā)中處理與錢有關(guān)的業(yè)務(wù)時,確保金額計算的準(zhǔn)確性和避免錯誤非常重要,這篇文章主要給大家介紹了關(guān)于Java中處理金額計算之使用Long還是BigDecimal的相關(guān)資料,需要的朋友可以參考下2024-07-07
MyBatis學(xué)習(xí)教程(二)—如何使用MyBatis對users表執(zhí)行CRUD操作
這篇文章主要介紹了MyBatis學(xué)習(xí)教程(二)—如何使用MyBatis對users表執(zhí)行CRUD操作的相關(guān)資料,需要的朋友可以參考下2016-05-05
關(guān)于Java父類沒有無參構(gòu)造方法子類處理方法
父類無參構(gòu)造方法,子類不寫,其實會默認調(diào)用父類的無參構(gòu)造方法也就是用super(),編譯運行后,會打印出"子類會調(diào)用Father的第一個構(gòu)造方法,這篇文章給大家介紹關(guān)于Java父類沒有無參構(gòu)造方法子類處理方法,感興趣的朋友一起看看吧2024-01-01
Java使用正則表達式實現(xiàn)找出數(shù)字功能示例
這篇文章主要介紹了Java使用正則表達式實現(xiàn)找出數(shù)字功能,結(jié)合實例形式分析了Java針對數(shù)字的匹配查找及非數(shù)字替換操作相關(guān)實現(xiàn)技巧,需要的朋友可以參考下2017-03-03
Java synchronized關(guān)鍵字和Lock接口實現(xiàn)原理
這篇文章主要介紹了Java synchronized關(guān)鍵字和Lock接口實現(xiàn)原理,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-12-12

