詳解Java七大阻塞隊(duì)列之SynchronousQueue
其實(shí)SynchronousQueue 是一個(gè)特別有意思的阻塞隊(duì)列,就我個(gè)人理解來說,它很重要的特點(diǎn)就是沒有容量。
直接看一個(gè)例子:
package dongguabai.test.juc.test;
import java.util.concurrent.SynchronousQueue;
/**
* @author Dongguabai
* @description
* @date 2021-09-01 21:52
*/
public class TestSynchronousQueue {
public static void main(String[] args) {
SynchronousQueue synchronousQueue = new SynchronousQueue();
boolean add = synchronousQueue.add("1");
System.out.println(add);
}
}
代碼很簡單,就是往 SynchronousQueue 里放了一個(gè)元素,程序卻拋異常了:
Exception in thread "main" java.lang.IllegalStateException: Queue full at java.util.AbstractQueue.add(AbstractQueue.java:98) at dongguabai.test.juc.test.TestSynchronousQueue.main(TestSynchronousQueue.java:14)
而異常原因是隊(duì)列滿了。剛剛使用的是 SynchronousQueue#add 方法,現(xiàn)在來看看 SynchronousQueue#put 方法:
public static void main(String[] args) throws InterruptedException {
SynchronousQueue synchronousQueue = new SynchronousQueue();
synchronousQueue.put("1");
System.out.println("----");
}
看到 InterruptedException 其實(shí)就能猜出這個(gè)方法肯定會(huì)阻塞當(dāng)前線程。
通過這兩個(gè)例子,也就解釋了 SynchronousQueue 隊(duì)列是沒有容量的,也就是說在往 SynchronousQueue 中添加元素之前,得先向 SynchronousQueue 中取出元素,這句話聽著很別扭,那可以換個(gè)角度猜想其實(shí)現(xiàn)原理,調(diào)用取出方法的時(shí)候設(shè)置了一個(gè)“已經(jīng)有線程在等待取出”的標(biāo)識(shí),線程等待,然后添加元素的時(shí)候,先看這個(gè)標(biāo)識(shí),如果有線程在等待取出,則添加成功,反之則拋出異?;蛘咦枞?/p>
分析
接下來從 SynchronousQueue#put 方法開始進(jìn)行分析:
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
可以發(fā)現(xiàn)是調(diào)用的 Transferer#transfer 方法,這個(gè) Transferer 是在構(gòu)造 SynchronousQueue 的時(shí)候初始化的:
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
SynchronousQueue 有兩種模式,公平與非公平,默認(rèn)是非公平,非公平使用的就是 TransferStack,是基于單向鏈表做的:
static final class SNode {
volatile SNode next; // next node in stack
volatile SNode match; // the node matched to this
volatile Thread waiter; // to control park/unpark
Object item; // data; or null for REQUESTs
int mode;
...
}
那么重點(diǎn)就是 SynchronousQueue.TransferStack#transfer 方法了,從方法名都可以看出這是用來做數(shù)據(jù)交換的,但是這個(gè)方法有好幾十行,里面各種 Node 指針搞來搞去,這個(gè)地方我覺得沒必要過于糾結(jié)細(xì)節(jié),老規(guī)矩,抓大放小,而且隊(duì)列這種,很方便進(jìn)行 Debug 調(diào)試。
再理一下思路:
- 今天研究的是阻塞隊(duì)列,關(guān)注阻塞的話,更應(yīng)該關(guān)系的是
take和put方法; Transferer是一個(gè)抽象類,只有一個(gè)transfer方法,即take和put共用,那就肯定是基于入?yún)⑦M(jìn)行功能的區(qū)分;take和put方法底層都調(diào)用的SynchronousQueue.TransferStack#transfer方法;
將上面 SynchronousQueue#put 使用的例子修改一下,再加一個(gè)線程take:
package dongguabai.test.juc.test;
import java.util.Date;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguabai
* @description
* @date 2021-09-01 21:52
*/
public class TestSynchronousQueue {
public static void main(String[] args) throws InterruptedException {
SynchronousQueue synchronousQueue = new SynchronousQueue();
new Thread(()->{
System.out.println(new Date().toLocaleString()+"::"+Thread.currentThread().getName()+"-put了數(shù)據(jù):"+"1");
try {
synchronousQueue.put("1");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
System.out.println("----");
new Thread(()->{
Object take = null;
try {
take = synchronousQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(new Date().toLocaleString()+"::"+Thread.currentThread().getName()+"-take到了數(shù)據(jù):"+take);
}).start();
TimeUnit.SECONDS.sleep(1);
System.out.println("結(jié)束...");
}
}
整個(gè)程序結(jié)束,并且輸出:
----
2021-9-2 0:58:55::Thread-0-put了數(shù)據(jù):1
2021-9-2 0:58:55::Thread-1-take到了數(shù)據(jù):1
結(jié)束...
也就是說當(dāng)一個(gè)線程在 put 的時(shí)候,如果有線程 take ,那么 put 線程可以正常運(yùn)行,不會(huì)被阻塞。
基于這個(gè)例子,再結(jié)合上文的猜想,也就是說核心點(diǎn)就是找到 put 的時(shí)候現(xiàn)在已經(jīng)有線程在 take 的標(biāo)識(shí),或者 take 的時(shí)候已經(jīng)有線程在 put,這個(gè)標(biāo)識(shí)不一定是變量,結(jié)合 AQS 的原理來看,很可能是根據(jù)鏈表中的 Node 進(jìn)行判斷。
接下來看 SynchronousQueue.put 方法:
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
它底層也是調(diào)用的 SynchronousQueue.TransferStack#transfer 方法,但是傳入?yún)?shù)是當(dāng)前 put 的元素、false 和 0。再回過頭看 SynchronousQueue.TransferStack#transfer 方法:
E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
//這里的參數(shù)e就是要put的元素,顯然不為null,也就是說是DATA模式,根據(jù)注釋,DATA模式就說明當(dāng)前線程是producer
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
if (h == null || h.mode == mode) { // empty or same-mode
if (timed && nanos <= 0) { // can't wait
if (h != null && h.isCancelled())
casHead(h, h.next); // pop cancelled node
else
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) {
//因?yàn)榈谝淮蝡ut那么h肯定為null,這里入?yún)imed為false,所以會(huì)到這里,執(zhí)行awaitFulfill方法,根據(jù)名稱可以猜想出是一個(gè)阻塞方法
SNode m = awaitFulfill(s, timed, nanos);
if (m == s) { // wait was cancelled
clean(s);
return null;
}
....
}
這里首先會(huì)構(gòu)造一個(gè) SNode,然后執(zhí)行 casHead 函數(shù),其實(shí)最終棧結(jié)構(gòu)就是:
head->put_e
就是 head 會(huì)指向 put 的元素對(duì)應(yīng)的 SNode。
然后會(huì)執(zhí)行 awaitFulfill 方法:
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
s.tryCancel();
SNode m = s.match;
if (m != null)
return m;
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
if (spins > 0)
spins = shouldSpin(s) ? (spins-1) : 0; //自旋機(jī)制
else if (s.waiter == null)
s.waiter = w; // establish waiter so can park next iter
else if (!timed)
LockSupport.park(this); //阻塞
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
最終還是會(huì)使用 LockSupport 進(jìn)行阻塞,等待喚醒。
已經(jīng)大致過了一遍流程了,細(xì)節(jié)方面就不再糾結(jié)了,那么假如再put 一個(gè)元素呢,其實(shí)結(jié)合源碼已經(jīng)可以分析出此時(shí)棧的結(jié)果為:
head-->put_e_1-->put_e
避免分析出錯(cuò),寫個(gè) Debug 的代碼驗(yàn)證一下:
package dongguabai.test.juc.test;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguabai
* @description
* @date 2021-09-02 02:15
*/
public class DebugPut2E {
public static void main(String[] args) throws InterruptedException {
SynchronousQueue synchronousQueue = new SynchronousQueue();
new Thread(()-> {
try {
synchronousQueue.put("1");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
TimeUnit.SECONDS.sleep(1);
new Thread(()-> {
try {
synchronousQueue.put("2");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
在 SynchronousQueue.TransferStack#awaitFulfill 方法的 LockSupport.park(this); 處打上斷點(diǎn),運(yùn)行上面的代碼,再看看現(xiàn)在的 head:

的確與分析的一致。
也就是先進(jìn)后出。再看 take 方法:
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
調(diào)用的 SynchronousQueue.TransferStack#transfer 方法,但是傳入?yún)?shù)是 null、false 和 0。
偷個(gè)懶就不分析源碼了,直接 Debug 走一遍,代碼如下:
package dongguabai.test.juc.test;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguabai
* @description
* @date 2021-09-02 02:24
*/
public class DebugTake {
public static void main(String[] args) throws InterruptedException {
SynchronousQueue synchronousQueue = new SynchronousQueue();
new Thread(()-> {
try {
synchronousQueue.put("1");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"Thread-put-1").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()-> {
try {
synchronousQueue.put("2");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"Thread-put-2").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
try {
Object take = synchronousQueue.take();
System.out.println("======take:"+take);
} catch (InterruptedException e) {
e.printStackTrace();
}
},"Thread-Take").start();
}
}
在 SynchronousQueue#take 方法中打上斷點(diǎn),運(yùn)行上面的代碼:

這里的 s 就是 head,m 就是棧頂?shù)脑?,也是最近一?put 的元素。說白了 take 就是取的棧頂?shù)脑?,最后再匹配一下,符合條件就直接取出來。take 之后 head 為:

棧的結(jié)構(gòu)為:
head-->put_e
最后再把整個(gè)流程梳理一遍:
執(zhí)行 put 操作的時(shí)候,每次壓入棧頂;take 的時(shí)候就取棧頂?shù)脑兀聪冗M(jìn)后出;這也就實(shí)現(xiàn)了非公平;
至于公平模式,結(jié)合 TransferStack 的實(shí)現(xiàn),可以猜測實(shí)現(xiàn)就是 put 的時(shí)候放入隊(duì)列,take 的時(shí)候從隊(duì)列頭部開始取,先進(jìn)先出。
那么這個(gè)隊(duì)列設(shè)計(jì)的優(yōu)勢使用場景在哪里呢?個(gè)人感覺它的優(yōu)勢就是完全不會(huì)產(chǎn)生對(duì)隊(duì)列中數(shù)據(jù)的爭搶,因?yàn)檎f白了隊(duì)列是空的,從某種程度上來說消費(fèi)速率是很快的。
至于使用場景,我這邊的確沒有想到比較好的使用場景。結(jié)合組內(nèi)同學(xué)的使用來看,他選擇使用這個(gè)隊(duì)列的原因是因?yàn)樗粫?huì)在內(nèi)存中生成任務(wù)隊(duì)列,當(dāng)服務(wù)宕機(jī)后不用擔(dān)心內(nèi)存中任務(wù)的丟失(非優(yōu)雅停機(jī)的情況)。經(jīng)過討論后發(fā)現(xiàn)即使使用了 SynchronousQueue 也無法有效的避免任務(wù)丟失,但這的確是一個(gè)思路,沒準(zhǔn)以后在其他場景中用得上。
到此這篇關(guān)于詳解Java七大阻塞隊(duì)列之SynchronousQueue的文章就介紹到這了,更多相關(guān)Java阻塞隊(duì)列 SynchronousQueue內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java跳出當(dāng)前的多重嵌套循環(huán)的五種方法
在Java編程中,跳出多重嵌套循環(huán)可以使用break語句、標(biāo)號(hào)與break組合、return語句、標(biāo)志變量和異常處理五種方法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-10-10
IDEA之項(xiàng)目run按鈕為灰色,無法運(yùn)行問題
這篇文章主要介紹了IDEA之項(xiàng)目run按鈕為灰色,無法運(yùn)行問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-12-12
Spring Boot Dubbo 構(gòu)建分布式服務(wù)的方法
這篇文章主要介紹了Spring Boot Dubbo 構(gòu)建分布式服務(wù)的方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2019-05-05
java如何實(shí)時(shí)動(dòng)態(tài)獲取properties文件的內(nèi)容
這篇文章主要介紹了java如何實(shí)時(shí)動(dòng)態(tài)獲取properties文件的內(nèi)容,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09
SpringCloud如何利用Feign訪問外部http請(qǐng)求
這篇文章主要介紹了SpringCloud如何利用Feign訪問外部http請(qǐng)求,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03
Java利用EasyExcel解析動(dòng)態(tài)表頭及導(dǎo)出實(shí)現(xiàn)過程
以前做導(dǎo)出功能,表頭和數(shù)據(jù)都是固定的,下面這篇文章主要給大家介紹了關(guān)于Java利用EasyExcel解析動(dòng)態(tài)表頭及導(dǎo)出實(shí)現(xiàn)的相關(guān)資料,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-12-12

