Java 中的阻塞隊(duì)列從基礎(chǔ)到高級(jí)的深度解析
提到阻塞隊(duì)列,許多人腦海中會(huì)浮現(xiàn)出
BlockingQueue、ArrayBlockingQueue、LinkedBlockingQueue和SynchronousQueue。盡管這些實(shí)現(xiàn)看起來(lái)復(fù)雜,實(shí)際上阻塞隊(duì)列本身的概念相對(duì)簡(jiǎn)單,真正挑戰(zhàn)在于內(nèi)部的 AQS(Abstract Queuing Synchronizer)。如果你對(duì)阻塞隊(duì)列感到陌生,希望下面的內(nèi)容能幫助你從全新角度理解它。
1、線程間通信
線程間通信是指多個(gè)線程對(duì)共享資源的操作和協(xié)調(diào)。在生產(chǎn)者-消費(fèi)者模型中,生產(chǎn)者和消費(fèi)者是不同種類的線程,他們對(duì)同一個(gè)資源(如隊(duì)列)進(jìn)行操作。生產(chǎn)者負(fù)責(zé)向隊(duì)列中插入數(shù)據(jù),消費(fèi)者負(fù)責(zé)從隊(duì)列中取出數(shù)據(jù)。
主要挑戰(zhàn)在于如何在資源達(dá)到上限時(shí)讓生產(chǎn)者等待,而在資源達(dá)到下限時(shí)讓消費(fèi)者等待。線程間的這種相互調(diào)度,就是線程間通信。
以現(xiàn)實(shí)生活為例。消費(fèi)者和生產(chǎn)者就像兩個(gè)線程,原本做著各自的事情,廠家管自己生產(chǎn),消費(fèi)者管自己買,一般情況下彼此互不影響。900 240

但當(dāng)物資到達(dá)某個(gè)臨界點(diǎn)時(shí),就需要根據(jù)供需關(guān)系適當(dāng)作出調(diào)整。比如,當(dāng)廠家做了一大堆東西,產(chǎn)能過(guò)剩時(shí),應(yīng)該暫停生產(chǎn),擴(kuò)大宣傳,讓消費(fèi)者過(guò)來(lái)消費(fèi)。

同理,當(dāng)消費(fèi)者發(fā)現(xiàn)某個(gè)熱銷商品售罄,應(yīng)該提醒廠家盡快生產(chǎn)。

在上面的案例中,生產(chǎn)者和消費(fèi)者是不同種類的線程,一個(gè)負(fù)責(zé)存入,另一個(gè)負(fù)責(zé)取出,且它們操作的是同一個(gè)資源。但最難的部分在于:資源到達(dá)上限時(shí),生產(chǎn)者等待,消費(fèi)者消費(fèi);資源達(dá)到下限時(shí),生產(chǎn)者生產(chǎn),消費(fèi)者等待。
我們可以發(fā)現(xiàn),原本互不打擾的兩個(gè)線程之間開(kāi)始了 “溝通”:
- 生產(chǎn)者:做的商品太多了,應(yīng)該擴(kuò)大宣傳,讓大家來(lái)買。
- 消費(fèi)者:都賣完啦,應(yīng)當(dāng)提醒商家盡快補(bǔ)貨。
這種線程間的相互調(diào)度,也就是線程間通信。
2、線程間通信的實(shí)現(xiàn)
實(shí)現(xiàn)線程間通信的方式有多種:
- 輪詢:生產(chǎn)者和消費(fèi)者線程通過(guò)循環(huán)不斷檢查隊(duì)列的狀態(tài)。這種方法簡(jiǎn)單,但會(huì)消耗大量 CPU 資源,且無(wú)法保證原子性。
- 等待喚醒機(jī)制(wait/notify):通過(guò)
wait和notify機(jī)制,線程可以在隊(duì)列為空或滿時(shí)阻塞自己,當(dāng)狀態(tài)改變時(shí)由其他線程喚醒。synchronized保證了線程的原子性,但notify可能導(dǎo)致線程競(jìng)爭(zhēng)不均。 - 等待喚醒機(jī)制(Condition):使用
ReentrantLock和Condition實(shí)現(xiàn)等待喚醒機(jī)制,可以更加精確地控制線程的阻塞和喚醒。通過(guò)創(chuàng)建不同的Condition實(shí)例,可以分別管理生產(chǎn)者和消費(fèi)者的等待狀態(tài),避免了notify的隨機(jī)喚醒問(wèn)題。
2.1、輪詢
設(shè)計(jì)理念:生產(chǎn)者和消費(fèi)者線程通過(guò)循環(huán)不斷檢查隊(duì)列的狀態(tài),隊(duì)列為空時(shí)生產(chǎn)者才可插入數(shù)據(jù),隊(duì)列不為空時(shí)消費(fèi)者才能取出數(shù)據(jù),否則一律 sleep 等待。

代碼實(shí)現(xiàn):
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
/**
* 自定義阻塞隊(duì)列實(shí)現(xiàn):輪詢版本
*
* @param <T> 隊(duì)列中存儲(chǔ)的元素類型
*/
public class WhileQueue<T> {
// 用來(lái)存儲(chǔ)元素的容器
private final LinkedList<T> queue = new LinkedList<>();
// 隊(duì)列的最大容量
private final int MAX_SIZE = 1;
/**
* 將元素添加到隊(duì)列中
*
* @param resource 要插入的元素
* @throws InterruptedException 如果當(dāng)前線程被中斷
*/
public void put(T resource) throws InterruptedException {
// 如果隊(duì)列滿了,生產(chǎn)者線程將進(jìn)入輪詢等待狀態(tài)
while (queue.size() >= MAX_SIZE) {
System.out.println("生產(chǎn)者:隊(duì)列已滿,無(wú)法插入...");
TimeUnit.MILLISECONDS.sleep(1000); // 線程等待1秒鐘再重試
}
// 插入元素到隊(duì)列的前面
System.out.println("生產(chǎn)者:插入" + resource + "!!!");
queue.addFirst(resource);
}
/**
* 從隊(duì)列中取出元素
*
* @throws InterruptedException 如果當(dāng)前線程被中斷
*/
public void take() throws InterruptedException {
// 如果隊(duì)列為空,消費(fèi)者線程將進(jìn)入輪詢等待狀態(tài)
while (queue.size() <= 0) {
System.out.println("消費(fèi)者:隊(duì)列為空,無(wú)法取出...");
TimeUnit.MILLISECONDS.sleep(1000); // 線程等待1秒鐘再重試
}
// 從隊(duì)列的末尾取出元素
System.out.println("消費(fèi)者:取出消息!!!");
queue.removeLast();
TimeUnit.MILLISECONDS.sleep(5000); // 模擬消費(fèi)操作需要時(shí)間
}
}測(cè)試:
/**
* 測(cè)試類:創(chuàng)建生產(chǎn)者和消費(fèi)者線程來(lái)測(cè)試WhileQueue的功能
*/
public class Test {
public static void main(String[] args) {
// 創(chuàng)建一個(gè)WhileQueue實(shí)例
WhileQueue<String> queue = new WhileQueue<>();
// 創(chuàng)建并啟動(dòng)生產(chǎn)者線程
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 100; i++) {
try {
queue.put("消息" + i); // 插入消息到隊(duì)列
} catch (InterruptedException e) {
e.printStackTrace(); // 捕獲并打印中斷異常
}
}
}
}).start();
// 創(chuàng)建并啟動(dòng)消費(fèi)者線程
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 100; i++) {
try {
queue.take(); // 從隊(duì)列中取出消息
} catch (InterruptedException e) {
e.printStackTrace(); // 捕獲并打印中斷異常
}
}
}
}).start();
}
}由于設(shè)定了隊(duì)列最多只能存1個(gè)消息,所以只有當(dāng)隊(duì)列為空時(shí),生產(chǎn)者才能插入數(shù)據(jù)。這是最簡(jiǎn)單的線程間通信:多個(gè)線程不斷輪詢共享資源,通過(guò)共享資源的狀態(tài)判斷自己下一步該做什么。
但上面的實(shí)現(xiàn)方式存在一些缺點(diǎn):
- 輪詢的方式太耗費(fèi) CPU 資源,如果線程過(guò)多,比如幾百上千個(gè)線程同時(shí)在那輪詢,會(huì)給 CPU 帶來(lái)較大負(fù)擔(dān)
- 無(wú)法保證原子性(代碼里沒(méi)有演示,但理論上確實(shí)如此,如果生產(chǎn)者的操作非原子性,消費(fèi)者極可能獲取到臟數(shù)據(jù))
2.2、等待喚醒機(jī)制(wait/notify)
相對(duì)而言,等待喚醒機(jī)制則要優(yōu)雅得多,底層維護(hù)線程隊(duì)列,線程可以在隊(duì)列為空或滿時(shí)阻塞自己,當(dāng)狀態(tài)改變時(shí)由其他線程喚醒。synchronized 保證了線程的原子性,同時(shí)避免了過(guò)多線程同時(shí)自旋造成的 CPU 資源浪費(fèi),頗有點(diǎn)用空間換時(shí)間的味道。
當(dāng)一個(gè)生產(chǎn)者線程無(wú)法插入數(shù)據(jù)時(shí),就讓它在隊(duì)列里休眠(阻塞),此時(shí)生產(chǎn)者線程會(huì)釋放 CPU 資源,等到消費(fèi)者搶到 CPU 執(zhí)行權(quán)并取出數(shù)據(jù)后,再由消費(fèi)者喚醒生產(chǎn)者繼續(xù)生產(chǎn)。
Java 有多種方式可以實(shí)現(xiàn)等待喚醒機(jī)制,最經(jīng)典的就是通過(guò) wait 和 notify 的方式:
import java.util.LinkedList;
/**
* 自定義阻塞隊(duì)列實(shí)現(xiàn):使用 wait/notify
*
* @param <T> 隊(duì)列中存儲(chǔ)的元素類型
*/
public class WaitNotifyQueue<T> {
// 用來(lái)存儲(chǔ)元素的容器
private final LinkedList<T> queue = new LinkedList<>();
// 隊(duì)列的最大容量
private final int MAX_SIZE = 1;
/**
* 將元素添加到隊(duì)列中
*
* @param resource 要插入的元素
* @throws InterruptedException 如果當(dāng)前線程被中斷
*/
public synchronized void put(T resource) throws InterruptedException {
// 當(dāng)隊(duì)列滿時(shí),生產(chǎn)者線程進(jìn)入等待狀態(tài)
while (queue.size() >= MAX_SIZE) {
System.out.println("生產(chǎn)者:隊(duì)列已滿,無(wú)法插入...");
this.wait(); // 釋放鎖,并進(jìn)入等待狀態(tài)
}
// 插入元素到隊(duì)列的前面
System.out.println("生產(chǎn)者:插入" + resource + "!!!");
queue.addFirst(resource);
this.notify(); // 喚醒等待的消費(fèi)者線程
}
/**
* 從隊(duì)列中取出元素
*
* @throws InterruptedException 如果當(dāng)前線程被中斷
*/
public synchronized void take() throws InterruptedException {
// 當(dāng)隊(duì)列為空時(shí),消費(fèi)者線程進(jìn)入等待狀態(tài)
while (queue.size() <= 0) {
System.out.println("消費(fèi)者:隊(duì)列為空,無(wú)法取出...");
this.wait(); // 釋放鎖,并進(jìn)入等待狀態(tài)
}
// 從隊(duì)列的末尾取出元素
System.out.println("消費(fèi)者:取出消息!!!");
queue.removeLast();
this.notify(); // 喚醒等待的生產(chǎn)者線程
}
}基于 wait 和 notify 的阻塞隊(duì)列。其原理是通過(guò)同步機(jī)制和線程通信來(lái)處理生產(chǎn)者-消費(fèi)者問(wèn)題。在 put 方法中,生產(chǎn)者線程檢查隊(duì)列是否已滿,如果已滿,則調(diào)用 wait 使自己進(jìn)入等待狀態(tài),釋放鎖,直到隊(duì)列有空位。生產(chǎn)者在插入元素后調(diào)用 notify 喚醒可能等待的消費(fèi)者線程。在 take 方法中,消費(fèi)者線程檢查隊(duì)列是否為空,如果為空,則調(diào)用 wait 使自己進(jìn)入等待狀態(tài),釋放鎖,直到隊(duì)列有新元素。消費(fèi)者在取出元素后調(diào)用 notify 喚醒可能等待的生產(chǎn)者線程。這種機(jī)制避免了忙等待,通過(guò)有效的線程通信提高了資源利用效率。
Ps:使用 notifyAll 在某些情況下可能更合適,尤其是當(dāng)有多個(gè)生產(chǎn)者和消費(fèi)者線程時(shí)。notifyAll 會(huì)喚醒所有等待的線程,而不僅僅是一個(gè)線程,這樣可以保證系統(tǒng)中的所有線程都有機(jī)會(huì)被喚醒,避免了因線程喚醒不充分導(dǎo)致的潛在問(wèn)題。
2.3、等待喚醒機(jī)制(Condition)
等待喚醒機(jī)制(wait/notify)版本的缺點(diǎn)是隨機(jī)喚醒容易出現(xiàn)"己方喚醒己方",最終導(dǎo)致全部線程阻塞的烏龍事件,雖然 wait/notifyAll 能解決這個(gè)問(wèn)題,但喚醒全部線程又不夠精確,會(huì)造成無(wú)謂的線程競(jìng)爭(zhēng)(實(shí)際只需要喚醒敵方線程即可)。
因此使用ReentrantLock和Condition實(shí)現(xiàn)等待喚醒機(jī)制,可以更加精確地控制線程的阻塞和喚醒。通過(guò)創(chuàng)建不同的Condition實(shí)例,可以分別管理生產(chǎn)者和消費(fèi)者的等待狀態(tài),避免了notify的隨機(jī)喚醒問(wèn)題。
作為改進(jìn)版,可以使用 ReentrantLock 的 Condition 替代 synchronized 和 wait/notify:
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionQueue<T> {
// 容器,用來(lái)裝東西
private final LinkedList<T> queue = new LinkedList<>();
private final int CAPACITY = 10; // 隊(duì)列容量
// 顯式鎖(相對(duì)地,synchronized鎖被稱為隱式鎖)
private final ReentrantLock lock = new ReentrantLock();
private final Condition producerCondition = lock.newCondition();
private final Condition consumerCondition = lock.newCondition();
public void put(T resource) throws InterruptedException {
lock.lock();
try {
while (queue.size() >= CAPACITY) {
// 隊(duì)列滿了,不能再塞東西了,等待消費(fèi)者取出數(shù)據(jù)
System.out.println("生產(chǎn)者:隊(duì)列已滿,無(wú)法插入...");
// 生產(chǎn)者阻塞
producerCondition.await();
}
System.out.println("生產(chǎn)者:插入" + resource + "!!!");
queue.addFirst(resource);
// 生產(chǎn)完畢,喚醒消費(fèi)者
consumerCondition.signal();
} finally {
lock.unlock();
}
}
public void take() throws InterruptedException {
lock.lock();
try {
while (queue.size() <= 0) {
// 隊(duì)列空了,不能再取東西,等待生產(chǎn)者插入數(shù)據(jù)
System.out.println("消費(fèi)者:隊(duì)列為空,無(wú)法取出...");
// 消費(fèi)者阻塞
consumerCondition.await();
}
System.out.println("消費(fèi)者:取出消息!!!");
queue.removeLast();
// 消費(fèi)完畢,喚醒生產(chǎn)者
producerCondition.signal();
} finally {
lock.unlock();
}
}
}如何理解 Condition 呢?可以認(rèn)為 lock.newCondition() 創(chuàng)建了一個(gè)隊(duì)列,調(diào)用 producerCondition.await() 會(huì)把生產(chǎn)者線程放入生產(chǎn)者的等待隊(duì)列中,當(dāng)消費(fèi)者調(diào)用producerCondition.signal() 時(shí)會(huì)喚醒從生產(chǎn)者的等待隊(duì)列中喚醒一個(gè)生產(chǎn)者線程出來(lái)工作。
也就是說(shuō),ReentrantLock 的 Condition 通過(guò)拆分線程等待隊(duì)列,讓線程的等待喚醒更加精確了,想喚醒哪一方就喚醒哪一方。
3、自定義阻塞隊(duì)列
基于以上機(jī)制,我們可以自定義實(shí)現(xiàn)一個(gè)簡(jiǎn)單的阻塞隊(duì)列。以下代碼示例展示了一個(gè)基于 wait/notifyAll 實(shí)現(xiàn)的阻塞隊(duì)列:
public class BlockingQueue<T> {
private final LinkedList<T> queue = new LinkedList<>();
private int MAX_SIZE = 1;
private int remainCount = 0;
public BlockingQueue(int capacity) {
if (capacity <= 0) {
throw new IllegalArgumentException("size最小為1");
}
this.MAX_SIZE = capacity;
}
public synchronized void put(T resource) throws InterruptedException {
while (queue.size() >= MAX_SIZE) {
this.wait();
}
queue.addFirst(resource);
remainCount++;
this.notifyAll();
}
public synchronized T take() throws InterruptedException {
while (queue.size() <= 0) {
this.wait();
}
T resource = queue.removeLast();
remainCount--;
this.notifyAll();
return resource;
}
}4、Java 中的 BlockingQueue
BlockingQueue 是 Java 并發(fā)包(java.util.concurrent)中的一個(gè)接口,繼承自 Queue 接口。它提供了額外的阻塞操作,例如在隊(duì)列為空時(shí)等待元素變得可用,或在隊(duì)列已滿時(shí)等待空間變得可用。
BlockingQueue 阻塞隊(duì)列在 Java 中的主要實(shí)現(xiàn)有三個(gè):
ArrayBlockingQueue: 基于數(shù)組實(shí)現(xiàn)的有界阻塞隊(duì)列,必須指定固定容量,支持可選的公平性策略。LinkedBlockingQueue: 基于鏈表實(shí)現(xiàn)的阻塞隊(duì)列,默認(rèn)無(wú)界或指定容量,有較高的插入和刪除性能。SynchronousQueue: 一個(gè)沒(méi)有內(nèi)部容量的隊(duì)列,每個(gè)插入操作必須等待一個(gè)對(duì)應(yīng)的刪除操作,反之亦然,適用于直接交換數(shù)據(jù)的場(chǎng)景。
更多實(shí)現(xiàn)可以參考:Java 并發(fā)集合:阻塞隊(duì)列集合介紹
到此這篇關(guān)于如何理解 Java 中的阻塞隊(duì)列:從基礎(chǔ)到高級(jí)的深度解析的文章就介紹到這了,更多相關(guān)java阻塞隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringMVC 通過(guò)ajax 實(shí)現(xiàn)文件上傳的步驟
使用form表單在springmvc 項(xiàng)目中上傳文件,文件上傳成功之后往往會(huì)跳轉(zhuǎn)到其他的頁(yè)面,但是有的時(shí)候,文件上傳成功的同時(shí),并不需要進(jìn)行頁(yè)面的跳轉(zhuǎn),可以通過(guò)ajax來(lái)實(shí)現(xiàn)文件的上傳,下面給大家介紹SpringMVC 通過(guò)ajax 實(shí)現(xiàn)文件上傳的步驟,感興趣的朋友一起看看吧2025-05-05
springboot Controller直接返回String類型帶來(lái)的亂碼問(wèn)題及解決
文章介紹了在Spring Boot中,當(dāng)Controller直接返回String類型時(shí)可能出現(xiàn)的亂碼問(wèn)題,并提供了解決辦法,通過(guò)在`application.yaml`中設(shè)置請(qǐng)求和響應(yīng)的編碼格式,并在自定義配置類中進(jìn)行配置,可以有效解決這一問(wèn)題2024-11-11
Springmvc異常處理器及攔截器實(shí)現(xiàn)代碼
這篇文章主要介紹了Springmvc異常處理器及攔截器實(shí)現(xiàn)代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-10-10
java JDBC主要組件連接數(shù)據(jù)庫(kù)及執(zhí)行SQL過(guò)程示例全面詳解
這篇文章主要為大家介紹了java JDBC主要組件連接數(shù)據(jù)庫(kù)及執(zhí)行SQL的過(guò)程示例全面詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-06-06
SpringLDAP目錄服務(wù)之LdapTemplate與LDAP操作方式
本文將深入探討Spring LDAP的核心概念、LdapTemplate的使用方法以及如何執(zhí)行常見(jiàn)的LDAP操作,幫助開(kāi)發(fā)者有效地將LDAP集成到Spring應(yīng)用中,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2025-04-04
java虛擬機(jī)JVM類加載機(jī)制原理(面試必問(wèn))
這篇文章主要介紹了面試當(dāng)中必會(huì)問(wèn)到的java虛擬機(jī)JVM類加載機(jī)制,非常的詳細(xì),有需要的朋友可以借鑒參考下,歡迎多多交流討論2021-08-08

