解析java中的condition
一、condition 介紹及demo
Condition是在java 1.5中才出現(xiàn)的,它用來(lái)替代傳統(tǒng)的Object的wait()、notify()實(shí)現(xiàn)線程間的協(xié)作,相比使用Object的wait()、notify(),使用Condition的await()、signal()這種方式實(shí)現(xiàn)線程間協(xié)作更加安全和高效。因此通常來(lái)說(shuō)比較推薦使用Condition,阻塞隊(duì)列實(shí)際上是使用了Condition來(lái)模擬線程間協(xié)作。
- Condition是個(gè)接口,基本的方法就是await()和signal()方法;
- Condition依賴于Lock接口,生成一個(gè)Condition的基本代碼是lock.newCondition()
- 調(diào)用Condition的await()和signal()方法,都必須在lock保護(hù)之內(nèi),就是說(shuō)必須在lock.lock()和lock.unlock之間才可以使用
Conditon中的await()對(duì)應(yīng)Object的wait();
Condition中的signal()對(duì)應(yīng)Object的notify();
Condition中的signalAll()對(duì)應(yīng)Object的notifyAll()。

condition常見(jiàn)例子arrayblockingqueue。下面是demo:
package thread;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConTest {
final Lock lock = new ReentrantLock();
final Condition condition = lock.newCondition();
public static void main(String[] args) {
// TODO Auto-generated method stub
ConTest test = new ConTest();
Producer producer = test.new Producer();
Consumer consumer = test.new Consumer();
consumer.start();
producer.start();
}
class Consumer extends Thread{
@Override
public void run() {
consume();
}
private void consume() {
try {
lock.lock();
System.out.println("我在等一個(gè)新信號(hào)"+this.currentThread().getName());
condition.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally{
System.out.println("拿到一個(gè)信號(hào)"+this.currentThread().getName());
lock.unlock();
}
}
}
class Producer extends Thread{
@Override
public void run() {
produce();
}
private void produce() {
try {
lock.lock();
System.out.println("我拿到鎖"+this.currentThread().getName());
condition.signalAll();
System.out.println("我發(fā)出了一個(gè)信號(hào):"+this.currentThread().getName());
} finally{
lock.unlock();
}
}
}
}
運(yùn)行結(jié)果:

Condition的執(zhí)行方式,是當(dāng)在線程Consumer中調(diào)用await方法后,線程Consumer將釋放鎖,并且將自己沉睡,等待喚醒,線程Producer獲取到鎖后,開(kāi)始做事,完畢后,調(diào)用Condition的signalall方法,喚醒線程Consumer,線程Consumer恢復(fù)執(zhí)行。
以上說(shuō)明Condition是一個(gè)多線程間協(xié)調(diào)通信的工具類,使得某個(gè),或者某些線程一起等待某個(gè)條件(Condition),只有當(dāng)該條件具備( signal 或者 signalAll方法被帶調(diào)用)時(shí) ,這些等待線程才會(huì)被喚醒,從而重新?tīng)?zhēng)奪鎖。
Condition實(shí)現(xiàn)生產(chǎn)者、消費(fèi)者模式:
package thread;
import java.util.PriorityQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConTest2 {
private int queueSize = 10;
private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
public static void main(String[] args) throws InterruptedException {
ConTest2 test = new ConTest2();
Producer producer = test.new Producer();
Consumer consumer = test.new Consumer();
producer.start();
consumer.start();
Thread.sleep(0);
producer.interrupt();
consumer.interrupt();
}
class Consumer extends Thread{
@Override
public void run() {
consume();
}
volatile boolean flag=true;
private void consume() {
while(flag){
lock.lock();
try {
while(queue.isEmpty()){
try {
System.out.println("隊(duì)列空,等待數(shù)據(jù)");
notEmpty.await();
} catch (InterruptedException e) {
flag =false;
}
}
queue.poll(); //每次移走隊(duì)首元素
notFull.signal();
System.out.println("從隊(duì)列取走一個(gè)元素,隊(duì)列剩余"+queue.size()+"個(gè)元素");
} finally{
lock.unlock();
}
}
}
}
class Producer extends Thread{
@Override
public void run() {
produce();
}
volatile boolean flag=true;
private void produce() {
while(flag){
lock.lock();
try {
while(queue.size() == queueSize){
try {
System.out.println("隊(duì)列滿,等待有空余空間");
notFull.await();
} catch (InterruptedException e) {
flag =false;
}
}
queue.offer(1); //每次插入一個(gè)元素
notEmpty.signal();
System.out.println("向隊(duì)列取中插入一個(gè)元素,隊(duì)列剩余空間:"+(queueSize-queue.size()));
} finally{
lock.unlock();
}
}
}
}
}
運(yùn)行結(jié)果如下:

二、Condition接口
condition可以通俗的理解為條件隊(duì)列。當(dāng)一個(gè)線程在調(diào)用了await方法以后,直到線程等待的某個(gè)條件為真的時(shí)候才會(huì)被喚醒。這種方式為線程提供了更加簡(jiǎn)單的等待/通知模式。Condition必須要配合鎖一起使用,因?yàn)閷?duì)共享狀態(tài)變量的訪問(wèn)發(fā)生在多線程環(huán)境下。一個(gè)Condition的實(shí)例必須與一個(gè)Lock綁定,因此Condition一般都是作為L(zhǎng)ock的內(nèi)部實(shí)現(xiàn)。
await() :造成當(dāng)前線程在接到信號(hào)或被中斷之前一直處于等待狀態(tài)。
await(long time, TimeUnit unit) :造成當(dāng)前線程在接到信號(hào)、被中斷或到達(dá)指定等待時(shí)間之前一直處于等待狀態(tài)
awaitNanos(long nanosTimeout) :造成當(dāng)前線程在接到信號(hào)、被中斷或到達(dá)指定等待時(shí)間之前一直處于等待狀態(tài)。
返回值表示剩余時(shí)間,如果在nanosTimesout之前喚醒,那么返回值 = nanosTimeout - 消耗時(shí)間,如果返回值 <= 0 ,則可以認(rèn)定它已經(jīng)超時(shí)了。
awaitUninterruptibly() :造成當(dāng)前線程在接到信號(hào)之前一直處于等待狀態(tài)。【注意:該方法對(duì)中斷不敏感】。
awaitUntil(Date deadline) :造成當(dāng)前線程在接到信號(hào)、被中斷或到達(dá)指定最后期限之前一直處于等待狀態(tài)。如果沒(méi)有到指定時(shí)間就被通知,則返回true,否則表示到了指定時(shí)間,返回返回false。
signal() :?jiǎn)拘岩粋€(gè)等待線程。該線程從等待方法返回前必須獲得與Condition相關(guān)的鎖。
signal()All :?jiǎn)拘阉械却€程。能夠從等待方法返回的線程必須獲得與Condition相關(guān)的鎖。
三、condition實(shí)現(xiàn)分析

- Condition接口包含了多種await方式和兩個(gè)通知方法
- ConditionObject實(shí)現(xiàn)了Condition接口,是AbstractQueuedSynchronizer的內(nèi)部類(因?yàn)镃ondition的操作都需要獲取想關(guān)聯(lián)的鎖)
- Reentrantlock的newCondition方法返回與某個(gè)lock實(shí)例相關(guān)的Condition對(duì)象
public abstract class AbstractQueuedLongSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
結(jié)合上面的類圖,我們看到condition實(shí)現(xiàn)是依賴于aqs,而aqs是個(gè)抽象類。里面定義了同步器的基本框架,實(shí)現(xiàn)了基本的結(jié)構(gòu)功能。只留有狀態(tài)條件的維護(hù)由具體同步器根據(jù)具體場(chǎng)景來(lái)定制,如常見(jiàn)的 ReentrantLock 、 RetrantReadWriteLock和CountDownLatch 等等,
3.1、等待隊(duì)列
Condition是AQS的內(nèi)部類。每個(gè)Condition對(duì)象都包含一個(gè)隊(duì)列(等待隊(duì)列)。等待隊(duì)列是一個(gè)FIFO的隊(duì)列,在隊(duì)列中的每個(gè)節(jié)點(diǎn)都包含了一個(gè)線程引用,該線程就是在Condition對(duì)象上等待的線程,如果一個(gè)線程調(diào)用了Condition.await()方法,那么該線程將會(huì)釋放鎖、構(gòu)造成節(jié)點(diǎn)加入等待隊(duì)列并進(jìn)入等待狀態(tài)。AQS有一個(gè)同步隊(duì)列和多個(gè)等待隊(duì)列,節(jié)點(diǎn)都是Node。等待隊(duì)列的基本結(jié)構(gòu)如下所示。

等待分為首節(jié)點(diǎn)和尾節(jié)點(diǎn)。當(dāng)一個(gè)線程調(diào)用Condition.await()方法,將會(huì)以當(dāng)前線程構(gòu)造節(jié)點(diǎn),并將節(jié)點(diǎn)從尾部加入等待隊(duì)列。新增節(jié)點(diǎn)就是將尾部節(jié)點(diǎn)指向新增的節(jié)點(diǎn)。節(jié)點(diǎn)引用更新本來(lái)就是在獲取鎖以后的操作,所以不需要CAS保證。同時(shí)也是線程安全的操作。
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
3.2、等待
當(dāng)線程調(diào)用了Condition的await()方法以后。線程就作為隊(duì)列中的一個(gè)節(jié)點(diǎn)被加入到等待隊(duì)列中去了。同時(shí)會(huì)釋放鎖的擁有。當(dāng)從await方法返回的時(shí)候。當(dāng)前線程一定會(huì)獲取condition相關(guān)聯(lián)的鎖。
如果從隊(duì)列(同步隊(duì)列和等待隊(duì)列)的角度去看await()方法,當(dāng)調(diào)用await()方法時(shí),相當(dāng)于同步隊(duì)列的首節(jié)點(diǎn)(獲取鎖的節(jié)點(diǎn))移動(dòng)到Condition的等待隊(duì)列中。
調(diào)用該方法的線程成功的獲取鎖的線程,也就是同步隊(duì)列的首節(jié)點(diǎn),該方法會(huì)將當(dāng)前線程構(gòu)造成節(jié)點(diǎn)并加入到等待隊(duì)列中,然后釋放同步狀態(tài),喚醒同步隊(duì)列中的后繼節(jié)點(diǎn),然后當(dāng)前線程會(huì)進(jìn)入等待狀態(tài)。
當(dāng)?shù)却?duì)列中的節(jié)點(diǎn)被喚醒的時(shí)候,則喚醒節(jié)點(diǎn)的線程開(kāi)始嘗試獲取同步狀態(tài)。如果不是通過(guò) 其他線程調(diào)用Condition.signal()方法喚醒,而是對(duì)等待線程進(jìn)行中斷,則會(huì)拋出InterruptedException異常信息。

我們看一下這個(gè)await的方法,它是AQS的方法,
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter(); //將當(dāng)前線程包裝下后,
//添加到Condition自己維護(hù)的一個(gè)鏈表中。
int savedState = fullyRelease(node);//釋放當(dāng)前線程占有的鎖,從demo中看到,
//調(diào)用await前,當(dāng)前線程是占有鎖的
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
//釋放完畢后,遍歷AQS的隊(duì)列,看當(dāng)前節(jié)點(diǎn)是否在隊(duì)列中,
//不在 說(shuō)明它還沒(méi)有競(jìng)爭(zhēng)鎖的資格,所以繼續(xù)將自己沉睡。
//直到它被加入到隊(duì)列中,聰明的你可能猜到了,
//沒(méi)有錯(cuò),在singal的時(shí)候加入不就可以了?
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//被喚醒后,重新開(kāi)始正式競(jìng)爭(zhēng)鎖,同樣,如果競(jìng)爭(zhēng)不到還是會(huì)將自己沉睡,等待喚醒重新開(kāi)始競(jìng)爭(zhēng)。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
結(jié)合代碼去看,同步隊(duì)列的首節(jié)點(diǎn) 并不會(huì)直接加入等待隊(duì)列,而是通過(guò)addConditionWaiter把當(dāng)前線程構(gòu)造成一個(gè)新節(jié)點(diǎn)并加入到等待隊(duì)列中。
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
3.3、通知
調(diào)用Condition的signal()方法,將會(huì)喚醒在等待隊(duì)列中等待最長(zhǎng)時(shí)間的節(jié)點(diǎn)(條件隊(duì)列里的首節(jié)點(diǎn)),在喚醒節(jié)點(diǎn)前,會(huì)將節(jié)點(diǎn)移到同步隊(duì)列中。當(dāng)前線程加入到等待隊(duì)列中如圖所示:

回到上面的demo,鎖被釋放后,線程Consumer開(kāi)始沉睡,這個(gè)時(shí)候線程因?yàn)榫€程Consumer沉睡時(shí),會(huì)喚醒AQS隊(duì)列中的頭結(jié)點(diǎn),所所以線程Producer會(huì)開(kāi)始競(jìng)爭(zhēng)鎖,并獲取到,執(zhí)行完后線程Producer會(huì)調(diào)用signal方法,“發(fā)出”signal信號(hào),signal方法如下:
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter; //firstWaiter為condition自己維護(hù)的一個(gè)鏈表的頭結(jié)點(diǎn),
//取出第一個(gè)節(jié)點(diǎn)后開(kāi)始喚醒操作
if (first != null)
doSignal(first);
}
在調(diào)用signal()方法之前必須先判斷是否獲取到了鎖(isHeldExclusively方法)。接著獲取等待隊(duì)列的首節(jié)點(diǎn),將其移動(dòng)到同步隊(duì)列并且利用LockSupport喚醒節(jié)點(diǎn)中的線程。
被喚醒的線程將從await方法中的while循環(huán)中退出( while (!isOnSyncQueue(node)) { 方法返回true,節(jié)點(diǎn)已經(jīng)在同步隊(duì)列中)。隨后調(diào)用同步器的acquireQueued()方法加入到同步狀態(tài)的競(jìng)爭(zhēng)當(dāng)中去。成功獲取到競(jìng)爭(zhēng)的線程從先前調(diào)用await方法返回,此時(shí)該線程已經(jīng)成功獲取了鎖。
*********************************************
AQS的同步隊(duì)列與Condition的等待隊(duì)列,兩個(gè)隊(duì)列的作用是不同,事實(shí)上,每個(gè)線程也僅僅會(huì)同時(shí)存在以上兩個(gè)隊(duì)列中的一個(gè),流程是這樣的:

注意:
1.線程producer調(diào)用signal方法,這個(gè)時(shí)候Condition的等待隊(duì)列中只有線程Consumer一個(gè)節(jié)點(diǎn),于是它被取出來(lái),并被加入到AQS的等待隊(duì)列中。 注意,這個(gè)時(shí)候,線程Consumer 并沒(méi)有被喚醒。
2.Sync是AQS的抽象子類,實(shí)現(xiàn)可重入和互斥的大部分功能。在Sync的子類中有FairSync和NonfairSync兩種代表公平鎖策略和非公平鎖策略。Sync lock方法留給子類去實(shí)現(xiàn),NonfairSync的實(shí)現(xiàn):
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
其中如果一開(kāi)始獲取鎖成功,是直接設(shè)置當(dāng)前線程。
否則執(zhí)行acquire(1),也就是進(jìn)入aqs等待隊(duì)列。這里不展開(kāi)細(xì)節(jié)。
可以這樣理解,整個(gè)協(xié)作過(guò)程是靠結(jié)點(diǎn)在AQS的等待隊(duì)列和Condition的等待隊(duì)列中來(lái)回移動(dòng)實(shí)現(xiàn)的,每個(gè)隊(duì)列的意義不同,Condition作為一個(gè)條件類,很好的自己維護(hù)了一個(gè)等待信號(hào)的隊(duì)列,并在適時(shí)的時(shí)候?qū)⒔Y(jié)點(diǎn)加入到AQS的等待隊(duì)列中來(lái)實(shí)現(xiàn)的喚醒操作
以上就是解析java中的condition的詳細(xì)內(nèi)容,更多關(guān)于java condition的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
使用Spring?Retry實(shí)現(xiàn)業(yè)務(wù)異常重試
在系統(tǒng)中經(jīng)常遇到業(yè)務(wù)重試的邏輯,比如三方接口調(diào)用,timeout重試三遍,異常處理重試的兜底邏輯等,本文給大家介紹一下如何使用Spring?Retry優(yōu)雅的實(shí)現(xiàn)業(yè)務(wù)異常重試,需要的朋友可以參考下2024-01-01
Java加載與存儲(chǔ)指令之ldc與_fast_aldc指令
ldc指令將int、float、或者一個(gè)類、方法類型或方法句柄的符號(hào)引用、還可能是String型常量值從常量池中推送至棧頂。這一篇介紹一個(gè)虛擬機(jī)規(guī)范中定義的一個(gè)字節(jié)碼指令ldc,另外還有一個(gè)虛擬機(jī)內(nèi)部使用的字節(jié)碼指令_fast_aldc。需要的盆友可參考下面文章的內(nèi)容2021-09-09
idea注解參數(shù)換行時(shí)間日期格式設(shè)置方法
這篇文章主要介紹了idea注解參數(shù)換行時(shí)間日期格式設(shè)置方法,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-05-05
redis?redisTemplate數(shù)據(jù)類型基礎(chǔ)操作
這篇文章主要介紹了redis?redisTemplate數(shù)據(jù)類型基礎(chǔ)操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-06-06
springboot與springmvc基礎(chǔ)入門(mén)講解
本篇文章主要介紹了詳解快速搭建Spring Boot+Spring MVC,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2021-07-07
mybatisplus開(kāi)啟sql打印的三種方式匯總
這篇文章主要介紹了mybatisplus開(kāi)啟sql打印的三種方式,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2024-01-01

