詳解Java并發(fā)包基石AQS
一、概述
AQS是一個用來構建鎖和同步器的框架,使用AQS能簡單且高效地構造出應用廣泛的大量的同步器,比如我們提到的ReentrantLock,Semaphore,其他的諸如ReentrantReadWriteLock,SynchronousQueue,F(xiàn)utureTask等等皆是基于AQS的。當然,我們自己也能利用AQS非常輕松容易地構造出符合我們自己需求的同步器。
本章我們就一起探究下這個神奇的東東,并對其實現(xiàn)原理進行剖析理解
二、基本實現(xiàn)原理
AQS使用一個int成員變量來表示同步狀態(tài),通過內置的FIFO隊列來完成獲取資源線程的排隊工作。
private volatile int state;//共享變量,使用volatile修飾保證線程可見性
狀態(tài)信息通過procted類型的getState,setState,compareAndSetState進行操作
AQS支持兩種同步方式:
1.獨占式
2.共享式
這樣方便使用者實現(xiàn)不同類型的同步組件,獨占式如ReentrantLock,共享式如Semaphore,CountDownLatch,組合式的如ReentrantReadWriteLock??傊?,AQS為使用提供了底層支撐,如何組裝實現(xiàn),使用者可以自由發(fā)揮。
同步器的設計是基于模板方法模式的,一般的使用方式是這樣:
1.使用者繼承AbstractQueuedSynchronizer并重寫指定的方法。(這些重寫方法很簡單,無非是對于共享資源state的獲取和釋放)
2.將AQS組合在自定義同步組件的實現(xiàn)中,并調用其模板方法,而這些模板方法會調用使用者重寫的方法。
這其實是模板方法模式的一個很經(jīng)典的應用。
我們來看看AQS定義的這些可重寫的方法:
- protected boolean tryAcquire(int arg) : 獨占式獲取同步狀態(tài),試著獲取,成功返回true,反之為false
- protected boolean tryRelease(int arg) :獨占式釋放同步狀態(tài),等待中的其他線程此時將有機會獲取到同步狀態(tài);
- protected int tryAcquireShared(int arg) :共享式獲取同步狀態(tài),返回值大于等于0,代表獲取成功;反之獲取失敗;
- protected boolean tryReleaseShared(int arg) :共享式釋放同步狀態(tài),成功為true,失敗為false
- protected boolean isHeldExclusively() : 是否在獨占模式下被線程占用。
關于AQS的使用,我們來簡單總結一下:
2.1、如何使用
首先,我們需要去繼承AbstractQueuedSynchronizer這個類,然后我們根據(jù)我們的需求去重寫相應的方法,比如要實現(xiàn)一個獨占鎖,那就去重寫tryAcquire,tryRelease方法,要實現(xiàn)共享鎖,就去重寫tryAcquireShared,tryReleaseShared;最后,在我們的組件中調用AQS中的模板方法就可以了,而這些模板方法是會調用到我們之前重寫的那些方法的。也就是說,我們只需要很小的工作量就可以實現(xiàn)自己的同步組件,重寫的那些方法,僅僅是一些簡單的對于共享資源state的獲取和釋放操作,至于像是獲取資源失敗,線程需要阻塞之類的操作,自然是AQS幫我們完成了。
2.2、設計思想
對于使用者來講,我們無需關心獲取資源失敗,線程排隊,線程阻塞/喚醒等一系列復雜的實現(xiàn),這些都在AQS中為我們處理好了。我們只需要負責好自己的那個環(huán)節(jié)就好,也就是獲取/釋放共享資源state的姿勢T_T。很經(jīng)典的模板方法設計模式的應用,AQS為我們定義好頂級邏輯的骨架,并提取出公用的線程入隊列/出隊列,阻塞/喚醒等一系列復雜邏輯的實現(xiàn),將部分簡單的可由使用者決定的操作邏輯延遲到子類中去實現(xiàn)即可。
三、自定義同步器
3.1、同步器代碼實現(xiàn)
上面大概講了一些關于AQS如何使用的理論性的東西,接下來,我們就來看下實際如何使用,直接采用JDK官方文檔中的小例子來說明問題
package juc;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class Mutex implements java.io.Serializable {
//靜態(tài)內部類,繼承AQS
private static class Sync extends AbstractQueuedSynchronizer {
//是否處于占用狀態(tài)
protected boolean isHeldExclusively() {
return getState() == 1;
}
//當狀態(tài)為0的時候獲取鎖,CAS操作成功,則state狀態(tài)為1,
public boolean tryAcquire(int acquires) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//釋放鎖,將同步狀態(tài)置為0
protected boolean tryRelease(int releases) {
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
}
//同步對象完成一系列復雜的操作,我們僅需指向它即可
private final Sync sync = new Sync();
//加鎖操作,代理到acquire(模板方法)上就行,acquire會調用我們重寫的tryAcquire方法
public void lock() {
sync.acquire(1);
}
public boolean tryLock() {
return sync.tryAcquire(1);
}
//釋放鎖,代理到release(模板方法)上就行,release會調用我們重寫的tryRelease方法。
public void unlock() {
sync.release(1);
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
}
3.2、同步器代碼測試
測試下這個自定義的同步器,我們使用之前文章中做過的并發(fā)環(huán)境下a++的例子來說明問題(a++的原子性其實最好使用原子類AtomicInteger來解決,此處用Mutex有點大炮打蚊子的意味,好在能說明問題就好)
package juc;
import java.util.concurrent.CyclicBarrier;
public class TestMutex {
private static CyclicBarrier barrier = new CyclicBarrier(31);
private static int a = 0;
private static Mutex mutex = new Mutex();
public static void main(String []args) throws Exception {
//說明:我們啟用30個線程,每個線程對i自加10000次,同步正常的話,最終結果應為300000;
//未加鎖前
for(int i=0;i<30;i++){
Thread t = new Thread(new Runnable() {
@Override
public void run() {
for(int i=0;i<10000;i++){
increment1();//沒有同步措施的a++;
}
try {
barrier.await();//等30個線程累加完畢
} catch (Exception e) {
e.printStackTrace();
}
}
});
t.start();
}
barrier.await();
System.out.println("加鎖前,a="+a);
//加鎖后
barrier.reset();//重置CyclicBarrier
a=0;
for(int i=0;i<30;i++){
new Thread(new Runnable() {
@Override
public void run() {
for(int i=0;i<10000;i++){
increment2();//a++采用Mutex進行同步處理
}
try {
barrier.await();//等30個線程累加完畢
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
barrier.await();
System.out.println("加鎖后,a="+a);
}
/**
* 沒有同步措施的a++
* @return
*/
public static void increment1(){
a++;
}
/**
* 使用自定義的Mutex進行同步處理的a++
*/
public static void increment2(){
mutex.lock();
a++;
mutex.unlock();
}
}
測試結果:
加鎖前,a=279204加鎖后,a=300000
四、源碼分析
我們先來簡單描述下AQS的基本實現(xiàn),前面我們提到過,AQS維護一個共享資源state,通過內置的FIFO來完成獲取資源線程的排隊工作。(這個內置的同步隊列稱為"CLH"隊列)。該隊列由一個一個的Node結點組成,每個Node結點維護一個prev引用和next引用,分別指向自己的前驅和后繼結點。AQS維護兩個指針,分別指向隊列頭部head和尾部tail。

其實就是個雙端雙向鏈表。
當線程獲取資源失?。ū热鐃ryAcquire時試圖設置state狀態(tài)失?。瑫粯嬙斐梢粋€結點加入CLH隊列中,同時當前線程會被阻塞在隊列中(通過LockSupport.park實現(xiàn),其實是等待態(tài))。當持有同步狀態(tài)的線程釋放同步狀態(tài)時,會喚醒后繼結點,然后此結點線程繼續(xù)加入到對同步狀態(tài)的爭奪中。
4.1、Node結點
Node結點是AbstractQueuedSynchronizer中的一個靜態(tài)內部類,我們撿Node的幾個重要屬性來說一下
static final class Node {
/** waitStatus值,表示線程已被取消(等待超時或者被中斷)*/
static final int CANCELLED = 1;
/** waitStatus值,表示后繼線程需要被喚醒(unpaking)*/
static final int SIGNAL = -1;
/**waitStatus值,表示結點線程等待在condition上,當被signal后,會從等待隊列轉移到同步到隊列中 */
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/** waitStatus值,表示下一次共享式同步狀態(tài)會被無條件地傳播下去
static final int PROPAGATE = -3;
/** 等待狀態(tài),初始為0 */
volatile int waitStatus;
/**當前結點的前驅結點 */
volatile Node prev;
/** 當前結點的后繼結點 */
volatile Node next;
/** 與當前結點關聯(lián)的排隊中的線程 */
volatile Thread thread;
/** ...... */
}
4.2、獨占式
獲取同步狀態(tài)--acquire()
來看看acquire方法,lock方法一般會直接代理到acquire上
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
我們來簡單理一下代碼邏輯:
a.首先,調用使用者重寫的tryAcquire方法,若返回true,意味著獲取同步狀態(tài)成功,后面的邏輯不再執(zhí)行;若返回false,也就是獲取同步狀態(tài)失敗,進入b步驟;
b.此時,獲取同步狀態(tài)失敗,構造獨占式同步結點,通過addWatiter將此結點添加到同步隊列的尾部(此時可能會有多個線程結點試圖加入同步隊列尾部,需要以線程安全的方 式添加);
c.該結點以在隊列中嘗試獲取同步狀態(tài),若獲取不到,則阻塞結點線程,直到被前驅結點喚醒或者被中斷。
addWaiter
為獲取同步狀態(tài)失敗的線程,構造成一個Node結點,添加到同步隊列尾部
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);//構造結點
//指向尾結點tail
Node pred = tail;
//如果尾結點不為空,CAS快速嘗試在尾部添加,若CAS設置成功,返回;否則,eng。
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
先cas快速設置,若失敗,進入enq方法
將結點添加到同步隊列尾部這個操作,同時可能會有多個線程嘗試添加到尾部,是非線程安全的操作。
以上代碼可以看出,使用了compareAndSetTail這個cas操作保證安全添加尾結點。
enq方法
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { //如果隊列為空,創(chuàng)建結點,同時被head和tail引用
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {//cas設置尾結點,不成功就一直重試
t.next = node;
return t;
}
}
}
}
enq內部是個死循環(huán),通過CAS設置尾結點,不成功就一直重試。很經(jīng)典的CAS自旋的用法,我們在之前關于原子類的源碼分析中也提到過。這是一種樂觀的并發(fā)策略。
最后,看下acquireQueued方法
acquireQueued
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {//死循環(huán)
final Node p = node.predecessor();//找到當前結點的前驅結點
if (p == head && tryAcquire(arg)) {//如果前驅結點是頭結點,才tryAcquire,其他結點是沒有機會tryAcquire的。
setHead(node);//獲取同步狀態(tài)成功,將當前結點設置為頭結點。
p.next = null; // 方便GC
failed = false;
return interrupted;
}
// 如果沒有獲取到同步狀態(tài),通過shouldParkAfterFailedAcquire判斷是否應該阻塞,parkAndCheckInterrupt用來阻塞線程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
acquireQueued內部也是一個死循環(huán),只有前驅結點是頭結點的結點,也就是老二結點,才有機會去tryAcquire;若tryAcquire成功,表示獲取同步狀態(tài)成功,將此結點設置為頭結點;若是非老二結點,或者tryAcquire失敗,則進入shouldParkAfterFailedAcquire去判斷判斷當前線程是否應該阻塞,若可以,調用parkAndCheckInterrupt阻塞當前線程,直到被中斷或者被前驅結點喚醒。若還不能休息,繼續(xù)循環(huán)。
shouldParkAfterFailedAcquire
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//獲取前驅結點的wait值
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)//若前驅結點的狀態(tài)是SIGNAL,意味著當前結點可以被安全地park
return true;
if (ws > 0) {
// ws>0,只有CANCEL狀態(tài)ws才大于0。若前驅結點處于CANCEL狀態(tài),也就是此結點線程已經(jīng)無效,從后往前遍歷,找到一個非CANCEL狀態(tài)的結點,將自己設置為它的后繼結點
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 若前驅結點為其他狀態(tài),將其設置為SIGNAL狀態(tài)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
若shouldParkAfterFailedAcquire返回true,也就是當前結點的前驅結點為SIGNAL狀態(tài),則意味著當前結點可以放心休息,進入parking狀態(tài)了。parkAncCheckInterrupt阻塞線程并處理中斷。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//使用LockSupport使線程進入阻塞狀態(tài)
return Thread.interrupted();// 線程是否被中斷過
}
至此,關于acquire的方法源碼已經(jīng)分析完畢,我們來簡單總結下
a.首先tryAcquire獲取同步狀態(tài),成功則直接返回;否則,進入下一環(huán)節(jié);
b.線程獲取同步狀態(tài)失敗,就構造一個結點,加入同步隊列中,這個過程要保證線程安全;
c.加入隊列中的結點線程進入自旋狀態(tài),若是老二結點(即前驅結點為頭結點),才有機會嘗試去獲取同步狀態(tài);否則,當其前驅結點的狀態(tài)為SIGNAL,線程便可安心休息,進入阻塞狀態(tài),直到被中斷或者被前驅結點喚醒。
釋放同步狀態(tài)--release()
當前線程執(zhí)行完自己的邏輯之后,需要釋放同步狀態(tài),來看看release方法的邏輯
public final boolean release(int arg) {
if (tryRelease(arg)) {//調用使用者重寫的tryRelease方法,若成功,喚醒其后繼結點,失敗則返回false
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//喚醒后繼結點
return true;
}
return false;
}
unparkSuccessor:喚醒后繼結點
private void unparkSuccessor(Node node) {
//獲取wait狀態(tài)
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);// 將等待狀態(tài)waitStatus設置為初始值0
Node s = node.next;//后繼結點
if (s == null || s.waitStatus > 0) {//若后繼結點為空,或狀態(tài)為CANCEL(已失效),則從后尾部往前遍歷找到一個處于正常阻塞狀態(tài)的結點 進行喚醒
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//使用LockSupprot喚醒結點對應的線程
}
release的同步狀態(tài)相對簡單,需要找到頭結點的后繼結點進行喚醒,若后繼結點為空或處于CANCEL狀態(tài),從后向前遍歷找尋一個正常的結點,喚醒其對應線程。
4.3、共享式
共享式:共享式地獲取同步狀態(tài)。對于獨占式同步組件來講,同一時刻只有一個線程能獲取到同步狀態(tài),其他線程都得去排隊等待,其待重寫的嘗試獲取同步狀態(tài)的方法tryAcquire返回值為boolean,這很容易理解;對于共享式同步組件來講,同一時刻可以有多個線程同時獲取到同步狀態(tài),這也是“共享”的意義所在。其待重寫的嘗試獲取同步狀態(tài)的方法tryAcquireShared返回值為int。
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
1.當返回值大于0時,表示獲取同步狀態(tài)成功,同時還有剩余同步狀態(tài)可供其他線程獲取;
2.當返回值等于0時,表示獲取同步狀態(tài)成功,但沒有可用同步狀態(tài)了;
3.當返回值小于0時,表示獲取同步狀態(tài)失敗。
獲取同步狀態(tài)--acquireShared
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)//返回值小于0,獲取同步狀態(tài)失敗,排隊去;獲取同步狀態(tài)成功,直接返回去干自己的事兒。
doAcquireShared(arg);
}
doAcquireShared
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);//構造一個共享結點,添加到同步隊列尾部。若隊列初始為空,先添加一個無意義的傀儡結點,再將新節(jié)點添加到隊列尾部。
boolean failed = true;//是否獲取成功
try {
boolean interrupted = false;//線程parking過程中是否被中斷過
for (;;) {//死循環(huán)
final Node p = node.predecessor();//找到前驅結點
if (p == head) {//頭結點持有同步狀態(tài),只有前驅是頭結點,才有機會嘗試獲取同步狀態(tài)
int r = tryAcquireShared(arg);//嘗試獲取同步裝填
if (r >= 0) {//r>=0,獲取成功
setHeadAndPropagate(node, r);//獲取成功就將當前結點設置為頭結點,若還有可用資源,傳播下去,也就是繼續(xù)喚醒后繼結點
p.next = null; // 方便GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&//是否能安心進入parking狀態(tài)
parkAndCheckInterrupt())//阻塞線程
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
大體邏輯與獨占式的acquireQueued差距不大,只不過由于是共享式,會有多個線程同時獲取到線程,也可能同時釋放線程,空出很多同步狀態(tài),所以當排隊中的老二獲取到同步狀態(tài),如果還有可用資源,會繼續(xù)傳播下去。
setHeadAndPropagate
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
釋放同步狀態(tài)--releaseShared
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();//釋放同步狀態(tài)
return true;
}
return false;
}
doReleaseShared
private void doReleaseShared() {
for (;;) {//死循環(huán),共享模式,持有同步狀態(tài)的線程可能有多個,采用循環(huán)CAS保證線程安全
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);//喚醒后繼結點
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
代碼邏輯比較容易理解,需要注意的是,共享模式,釋放同步狀態(tài)也是多線程的,此處采用了CAS自旋來保證。
五、總結
關于AQS的介紹及源碼分析到此為止了。
AQS是JUC中很多同步組件的構建基礎,簡單來講,它內部實現(xiàn)主要是狀態(tài)變量state和一個FIFO隊列來完成,同步隊列的頭結點是當前獲取到同步狀態(tài)的結點,獲取同步狀態(tài)state失敗的線程,會被構造成一個結點(或共享式或獨占式)加入到同步隊列尾部(采用自旋CAS來保證此操作的線程安全),隨后線程會阻塞;釋放時喚醒頭結點的后繼結點,使其加入對同步狀態(tài)的爭奪中。
AQS為我們定義好了頂層的處理實現(xiàn)邏輯,我們在使用AQS構建符合我們需求的同步組件時,只需重寫tryAcquire,tryAcquireShared,tryRelease,tryReleaseShared幾個方法,來決定同步狀態(tài)的釋放和獲取即可,至于背后復雜的線程排隊,線程阻塞/喚醒,如何保證線程安全,都由AQS為我們完成了,這也是非常典型的模板方法的應用。AQS定義好頂級邏輯的骨架,并提取出公用的線程入隊列/出隊列,阻塞/喚醒等一系列復雜邏輯的實現(xiàn),將部分簡單的可由使用者決定的操作邏輯延遲到子類中去實現(xiàn)。
以上就是詳解Java并發(fā)包基石AQS的詳細內容,更多關于Java并發(fā)包基石AQS的資料請關注腳本之家其它相關文章!
相關文章
SpringSecurity+JWT實現(xiàn)前后端分離的使用詳解
這篇文章主要介紹了SpringSecurity+JWT實現(xiàn)前后端分離的使用詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2021-01-01
mybatis-flex與springBoot整合的實現(xiàn)示例
Mybatis-flex提供了簡單易用的API,開發(fā)者只需要簡單的配置即可使用,本文主要介紹了mybatis-flex與springBoot整合,具有一定的參考價值,感興趣的可以了解一下2024-01-01
Springboot+WebSocket實現(xiàn)在線聊天功能
WebSocket協(xié)議是基于TCP的一種新的網(wǎng)絡協(xié)議。這篇文章主要為大家介紹了如何利用Springboot和WebSocket實現(xiàn)在線聊天功能,感興趣的小伙伴可以了解一下2023-02-02
SpringMVC MethodArgumentResolver的作用與實現(xiàn)
這篇文章主要介紹了SpringMVC MethodArgumentResolver的作用與實現(xiàn),MethodArgumentResolver采用一種策略模式,在Handler的方法被調用前,Spring MVC會自動將HTTP請求中的參數(shù)轉換成方法參數(shù)2023-04-04
Spring Boot 會員管理系統(tǒng)之處理文件上傳功能
Spring Boot會員管理系統(tǒng)的中,需要涉及到Spring框架,SpringMVC框架,Hibernate框架,thymeleaf模板引擎。這篇文章主要介紹了Spring Boot會員管理系統(tǒng)之處理文件上傳功能,需要的朋友可以參考下2018-03-03

