Java阻塞隊(duì)列BlockingQueue詳解
隊(duì)列的類(lèi)型
- 無(wú)限隊(duì)列(unbounded queue) 無(wú)容量限定,只隨存儲(chǔ)變化
- 有限隊(duì)列(bounded queue) 定義了最大容量
向無(wú)限隊(duì)列添加元素的所有操作都將永遠(yuǎn)不會(huì)阻塞(也是線程安全的),因此它可以增長(zhǎng)到非常大的容量。 使用無(wú)限阻塞隊(duì)列 BlockingQueue 設(shè)計(jì)生產(chǎn)者 - 消費(fèi)者模型時(shí)最重要的是消費(fèi)者應(yīng)該能夠像生產(chǎn)者向隊(duì)列添加消息一樣快地消費(fèi)消息 。否則可能內(nèi)存不足而拋出 OutOfMemory 異常。
數(shù)據(jù)結(jié)構(gòu)
- 1.通常使用鏈表或數(shù)組實(shí)現(xiàn)
- 2.一般具有 FIFO(先進(jìn)先出) 特性,也可以設(shè)計(jì)為雙端隊(duì)列
- 3.隊(duì)列的主要操作:入隊(duì)和出隊(duì)
阻塞隊(duì)列 BlockingQueue
定義:線程通信中,在任意時(shí)刻,無(wú)論并發(fā)有多高,在單個(gè) JVM 上,同一時(shí)間永遠(yuǎn)只有一個(gè)線程能對(duì)隊(duì)列進(jìn)行入隊(duì)或出隊(duì)操作。BlockingQueue 可以在線程之間共享而無(wú)需任何顯式同步
阻塞隊(duì)列的類(lèi)型:

JAVA中的應(yīng)用場(chǎng)景 : 線程池、SpringCloud-Eureka 三級(jí)緩存、Nacos、MQ、Netty 等
常見(jiàn)的阻塞隊(duì)列
- ArrayBlockingQueue : 由數(shù)組支持的有界隊(duì)列
- 應(yīng)用場(chǎng)景: 線程池中有比較多的應(yīng)用、生產(chǎn)者消費(fèi)者模型
- 工作原理: 基于 ReentrantLock 保證線程安全,根據(jù)Condition實(shí)現(xiàn)隊(duì)列滿時(shí)的阻塞
- LinkedBlockingQueue : 基于鏈表的無(wú)界隊(duì)列(理論上有界)
- PriorityBlockingQueue : 由優(yōu)先級(jí)堆支持的無(wú)界優(yōu)先級(jí)隊(duì)列
- DelayQueue : 由優(yōu)先級(jí)堆支持的、基于時(shí)間的調(diào)度隊(duì)列,內(nèi)部基于無(wú)界隊(duì)列PriorityQueue 實(shí)現(xiàn),而無(wú)界隊(duì)列基于數(shù)組的擴(kuò)容實(shí)現(xiàn)
- 使用方法: 入隊(duì)的對(duì)象必須要實(shí)現(xiàn) Delayed 接口,而 Delayed 集成自 Comparable 接口
- 應(yīng)用場(chǎng)景: 售賣(mài)電影票等
- 工作原理: 隊(duì)列內(nèi)部會(huì)根據(jù)時(shí)間優(yōu)先級(jí)進(jìn)行排序。延遲類(lèi)線程池周期執(zhí)行。
它們都實(shí)現(xiàn)了BlockingQueue接口,都有put()和take()等方法,創(chuàng)建方式如下:
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<> (666);
BlockingQueue API
添加元素:
| 方法 | 含義 |
|---|---|
| add() | 如果插入成功則返回 true,否則拋出 IllegalStateException 異常 |
| put() | 將指定的元素插入隊(duì)列,如果隊(duì)列滿了,會(huì)阻塞直到有空間插入 |
| offer() | 如果插入成功則返回 true,否則返回 false |
| offer(E e, long timeout, TimeUnit unit) | 嘗試將元素插入隊(duì)列,如果隊(duì)列已滿,會(huì)阻塞直到有空間插入,阻塞有時(shí)間控制 |
檢索元素:
| 方法 | 含義 |
|---|---|
| take() | 獲取隊(duì)列的頭部元素并將其刪除,如果隊(duì)列為空,則阻塞并等待元素變?yōu)榭捎?/td> |
| poll(long timeout, TimeUnit unit) | 檢索并刪除隊(duì)列的頭部,如有必要,等待指定的等待時(shí)間以使元素可用,如果超時(shí),則返回 null |
ArrayBlockingQueue 源碼簡(jiǎn)解
實(shí)現(xiàn):同步等待隊(duì)列(CLH)+ 條件等待隊(duì)列滿足條件的元素在CLH隊(duì)列中等待鎖,不滿足條件的隊(duì)列挪到條件等待隊(duì)列,滿足條件后再?gòu)?tail 插入 CLH 隊(duì)列
線程獲取鎖的條件: 在 CLH 隊(duì)列里等待的 Node 節(jié)點(diǎn),并且 Node 節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是 Singal。條件等待隊(duì)列里的線程是無(wú)法獲取鎖的。
/**
* 構(gòu)造方法
* 還有兩個(gè)構(gòu)造函數(shù),一個(gè)無(wú)fair參數(shù),一個(gè)可傳入集合,創(chuàng)建時(shí)插入隊(duì)列
* @param capacity 固定容量
* @param fair 默認(rèn)是false:訪問(wèn)順序未指定; true:按照FIFO順序處理
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
? if (capacity <= 0)
? ? ? ?throw new IllegalArgumentException();
? ?this.items = new Object[capacity];
? ?lock = new ReentrantLock(fair); // 根據(jù)fair創(chuàng)建對(duì)應(yīng)的鎖
? ?// 條件對(duì)象,配合容器能滿足業(yè)務(wù)
? ?notEmpty = lock.newCondition(); // 出隊(duì)條件對(duì)象
? ?notFull = ?lock.newCondition(); // 入隊(duì)條件對(duì)象
}
/**
* 入隊(duì)方法
* 在隊(duì)列的尾部插入指定的元素,如果隊(duì)列已滿,則等待空間可用
*/
public void put(E e) throws InterruptedException {
? ?checkNotNull(e); // 檢查put對(duì)象是否為空,空拋出異常
? ?final ReentrantLock lock = this.lock;
? ?lock.lockInterruptibly(); // 若未被中斷嘗試獲取鎖,詳見(jiàn)下文
? ?try {
? // 隊(duì)列中元素的數(shù)量 等于 排隊(duì)元素的長(zhǎng)度
? ? ? ?while (count == items.length)
? ? ? ? ? ?notFull.await(); // 見(jiàn)下文
? ? ? ?enqueue(e); // 元素入隊(duì)
? } finally {
? ? ? ?lock.unlock();
? }
}
/**
* 出隊(duì)方法
* 獲取隊(duì)列的頭部元素并將其刪除,如果隊(duì)列為空,則阻塞并等待元素變?yōu)榭捎?
*/
public E take() throws InterruptedException {
? ?final ReentrantLock lock = this.lock;
? ?lock.lockInterruptibly(); // 見(jiàn)下文
? ?try {
? ? ? ?while (count == 0)
? ? ? ? ? ?notEmpty.await(); // 見(jiàn)下文
? ? ? ?return dequeue(); // 元素出隊(duì)
? } finally {
? ? ? ?lock.unlock();
? }
}令當(dāng)前線程等待,直到收到信號(hào)或被中斷詳:與此 Condition 關(guān)聯(lián)的鎖被自動(dòng)釋放,進(jìn)入等待,并且處于休眠狀態(tài),直到發(fā)生以下四種情況之一:
- ①其他線程調(diào)用這個(gè)Condition的 signal 方法,當(dāng)前線程恰好被選為要被喚醒的線程;
- ②其他線程調(diào)用這個(gè)條件的 signalAll 方法
- ③其他線程中斷當(dāng)前線程,支持中斷線程掛起;
- ④一個(gè)“虛假的喚醒”發(fā)生了。
在這些情況下,在此方法返回之前,當(dāng)前線程必須重新獲得與此條件相關(guān)聯(lián)的鎖。當(dāng)線程返回時(shí),保證它持有這個(gè)鎖。
如果當(dāng)前線程有以下兩種情況之一:
- ①在進(jìn)入該方法時(shí)設(shè)置中斷狀態(tài);
- ②在等待時(shí)被中斷,支持線程掛起的中斷 拋出InterruptedException
生產(chǎn)者消費(fèi)者模式
BlockingQueue 可以在線程之間共享而無(wú)需任何顯式同步,在生產(chǎn)者消費(fèi)者之間,只需要將阻塞隊(duì)列以參數(shù)的形式進(jìn)行傳遞即可。它內(nèi)部的機(jī)制會(huì)自動(dòng)保證線程的安全性。
生產(chǎn)者:實(shí)現(xiàn)了 Runnable 接口,每個(gè)生產(chǎn)者生產(chǎn)100種商品和1個(gè)中斷標(biāo)記后完成線程任務(wù)
@Slf4j
@Slf4j
public class Producer implements Runnable{
? ?// 作為參數(shù)的阻塞隊(duì)列
? ?private BlockingQueue<Integer> blockingQueue;
? ?private final int stopTag;
? ?/**
? ? * 構(gòu)造方法
? ? * @param blockingQueue
? ? * @param stopTag
? ? */
? ?public Producer(BlockingQueue<Integer> blockingQueue,int stopTag) {
? ? ? ?this.blockingQueue = blockingQueue;
? ? ? ?this.stopTag = stopTag;
? }
? ?@Override
? ?public void run() {
? ? ? ?try {
? ? ? ? ? ?generateNumbers();
? ? ? } catch (InterruptedException e) {
? ? ? ? ? ?Thread.currentThread().interrupt();
? ? ? }
? }
? ?private void generateNumbers() throws InterruptedException {
? ? ? ?// 每個(gè)生產(chǎn)者都隨機(jī)生產(chǎn)10種商品
? ? ? ?for (int i = 0; i < 10; i++) {
? ? ? ? ? ?int product = ThreadLocalRandom.current().nextInt(1000,1100);
? ? ? ? ? ?log.info("生產(chǎn)者{}號(hào),生產(chǎn)了商品,編號(hào)為{}",Thread.currentThread().getId(),product);
? ? ? ? ? ?blockingQueue.put(product);
? ? ? }
? ? ? ?// 生產(chǎn)終止標(biāo)記
? ? ? ?blockingQueue.put(stopTag);
? ? ? ?log.info("生產(chǎn)者{}號(hào),生產(chǎn)了第終止標(biāo)記編號(hào){}",Thread.currentThread().getId(),Thread.currentThread().getId());
? }
}消費(fèi)者:消費(fèi)者拿到終止消費(fèi)標(biāo)記終止消費(fèi),否則消費(fèi)商品,拿到終止標(biāo)記后完成線程任務(wù)
@Slf4j
public class Consumer implements Runnable{
? ?// 作為參數(shù)的阻塞隊(duì)列
? ?private BlockingQueue<Integer> queue;
? ?private final int stopTage;
? ?public Consumer(BlockingQueue<Integer> queue, int stopTage) {
? ? ? ?this.queue = queue;
? ? ? ?this.stopTage = stopTage;
? }
? ?@Override
? ?public void run() {
? ? ? ?try {
? ? ? ? ? ?while (true) {
? ? ? ? ? ? ? ?Integer product = queue.take();
? ? ? ? ? ? ? ?if (product.equals(stopTage)) {
? ? ? ? ? ? ? ? ? ?log.info("{}號(hào)消費(fèi)者,停止消費(fèi),因?yàn)槟玫搅送V瓜M(fèi)標(biāo)記",Thread.currentThread().getId());
? ? ? ? ? ? ? ? ? ?return;
? ? ? ? ? ? ? }
? ? ? ? ? ? ? ?log.info("{}號(hào)消費(fèi)者,拿到的商品編號(hào):{}",Thread.currentThread().getId(),product);
? ? ? ? ? }
? ? ? } catch (InterruptedException e) {
? ? ? ? ? ?Thread.currentThread().interrupt();
? ? ? }
? }
}客戶端類(lèi): 創(chuàng)建與計(jì)算機(jī) CPU 核數(shù)相同的線程數(shù),與 16個(gè)生產(chǎn)者
public class ProductConsumerTest {
? ?public static void main(String[] args) {
? ? ? ?// 阻塞隊(duì)列容量
? ? ? ?int blockingQueueSize = 10;
? ? ? ?// 生產(chǎn)者數(shù)量
? ? ? ?int producerSize = 16;
? ? ? ?// 消費(fèi)者數(shù)量 = 計(jì)算機(jī)線程核數(shù) 8
? ? ? ?int consumerSize = Runtime.getRuntime().availableProcessors();
? ? ? ?// 終止消費(fèi)標(biāo)記
? ? ? ?int stopTag = Integer.MAX_VALUE;
? ? ? ?BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(blockingQueueSize);
? ? ? ?// 創(chuàng)建16個(gè)生產(chǎn)者線程
? ? ? ?for (int i = 0; i < producerSize; i++) {
? ? ? ? ? ?new Thread(new Producer(blockingQueue, stopTag)).start();
? ? ? }
? ? ? ?// 創(chuàng)建8個(gè)消費(fèi)者線程
? ? ? ?for (int j = 0; j < consumerSize; j++) {
? ? ? ? ? ?new Thread(new Consumer(blockingQueue, stopTag)).start();
? ? ? }
? }
}延遲隊(duì)列 DelayQueue
定義: Java 延遲隊(duì)列提供了在指定時(shí)間才能獲取隊(duì)列元素的功能,隊(duì)列頭元素是最接近過(guò)期的元素。沒(méi)有過(guò)期元素的話,使用 poll() 方法會(huì)返回 null 值,超時(shí)判定是通過(guò)getDelay(TimeUnit.NANOSECONDS)方法的返回值小于等于 0 來(lái)判斷。延時(shí)隊(duì)列不能存放空元素。
/**
* 電影票類(lèi),實(shí)現(xiàn)了Delayed接口,重寫(xiě) compareTo 和 getDelay方法
*/
public class MovieTicket implements Delayed {
? ?//延遲時(shí)間
? ?private final long delay;
? ?//到期時(shí)間
? ?private final long expire;
? ?//數(shù)據(jù)
? ?private final String msg;
? ?//創(chuàng)建時(shí)間
? ?private final long now;
? ?public long getDelay() {
? ? ? ?return delay;
? }
? ?public long getExpire() {
? ? ? ?return expire;
? }
? ?public String getMsg() {
? ? ? ?return msg;
? }
? ?public long getNow() {
? ? ? ?return now;
? }
? ?/**
? ? * @param msg 消息
? ? * @param delay 延期時(shí)間
? ? */
? ?public MovieTicket(String msg , long delay) {
? ? ? ?this.delay = delay;
? ? ? ?this.msg = msg;
? ? ? ?expire = System.currentTimeMillis() + delay; ? ?//到期時(shí)間 = 當(dāng)前時(shí)間+延遲時(shí)間
? ? ? ?now = System.currentTimeMillis();
? }
? ?/**
? ? * @param msg
? ? */
? ?public MovieTicket(String msg){
? ? ? ?this(msg,1000);
? }
? ?public MovieTicket(){
? ? ? ?this(null,1000);
? }
? ?/**
? ? * 獲得延遲時(shí)間 ? 用過(guò)期時(shí)間-當(dāng)前時(shí)間,時(shí)間單位毫秒
? ? * @param unit
? ? * @return
? ? */
? ?@Override
? ?public long getDelay(TimeUnit unit) {
? ? ? ?return unit.convert(this.expire
? ? ? ? ? ? ? ?- System.currentTimeMillis() , TimeUnit.MILLISECONDS);
? }
? ?/**
? ? * 用于延遲隊(duì)列內(nèi)部比較排序 當(dāng)前時(shí)間的延遲時(shí)間 - 比較對(duì)象的延遲時(shí)間
? ? * 越早過(guò)期的時(shí)間在隊(duì)列中越靠前
? ? * @param delayed
? ? * @return
? ? */
? ?@Override
? ?public int compareTo(Delayed delayed) {
? ? ? ?return (int) (this.getDelay(TimeUnit.MILLISECONDS)
? ? ? ? ? ? ? ?- delayed.getDelay(TimeUnit.MILLISECONDS));
? }
}測(cè)試類(lèi):
public static void main(String[] args) {
? ?DelayQueue<MovieTicket> delayQueue = new DelayQueue<MovieTicket>();
? ?MovieTicket ticket = new MovieTicket("電影票1",10000);
? ?delayQueue.put(ticket);
? ?MovieTicket ticket1 = new MovieTicket("電影票2",5000);
? ?delayQueue.put(ticket1);
? ?MovieTicket ticket2 = new MovieTicket("電影票3",8000);
? ?delayQueue.put(ticket2);
? ?log.info("message:--->入隊(duì)完畢");
? ?while( delayQueue.size() > 0 ){
? ? ? ?try {
? ? ? ? ? ?ticket = delayQueue.take();
? ? ? ? ? ?log.info("電影票出隊(duì):{}",ticket.getMsg());
? ? ? } catch (InterruptedException e) {
? ? ? ? ? ?e.printStackTrace();
? ? ? }
? }
}從運(yùn)行結(jié)果可以看出隊(duì)列是延遲出隊(duì),間隔和我們所設(shè)置的時(shí)間相同
到此這篇關(guān)于Java阻塞隊(duì)列BlockingQueue詳解的文章就介紹到這了,更多相關(guān)Java BlockingQueue 內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- 關(guān)于Java中阻塞隊(duì)列BlockingQueue的詳解
- Java阻塞隊(duì)列BlockingQueue基礎(chǔ)與使用
- Java阻塞隊(duì)列必看類(lèi):BlockingQueue快速了解大體框架和實(shí)現(xiàn)思路
- Java?阻塞隊(duì)列BlockingQueue詳解
- Java并發(fā)編程之阻塞隊(duì)列(BlockingQueue)詳解
- Java源碼解析阻塞隊(duì)列ArrayBlockingQueue介紹
- Java源碼解析阻塞隊(duì)列ArrayBlockingQueue常用方法
- Java源碼解析阻塞隊(duì)列ArrayBlockingQueue功能簡(jiǎn)介
- 詳解Java阻塞隊(duì)列(BlockingQueue)的實(shí)現(xiàn)原理
- java 中 阻塞隊(duì)列BlockingQueue詳解及實(shí)例
- 一文簡(jiǎn)介Java中BlockingQueue阻塞隊(duì)列
相關(guān)文章
SpringBoot之QueryDsl嵌套子查詢問(wèn)題
這篇文章主要介紹了SpringBoot之QueryDsl嵌套子查詢問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-03-03
java開(kāi)發(fā)validate方法中校驗(yàn)工具類(lèi)詳解
這篇文章主要為大家介紹了java開(kāi)發(fā)validate方法中校驗(yàn)工具類(lèi)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-09-09
Java Servlet線程中AsyncContext異步處理Http請(qǐng)求
這篇文章主要介紹了Java Servlet線程中AsyncContext異步處理Http請(qǐng)求及在業(yè)務(wù)中應(yīng)用,AsyncContext是Servlet 3.0使Servlet 線程不再需要一直阻塞,直到業(yè)務(wù)處理完畢才能再輸出響應(yīng),最后才結(jié)束該Servlet線程2023-03-03
Java mysql數(shù)據(jù)庫(kù)并進(jìn)行內(nèi)容查詢實(shí)例代碼
這篇文章主要介紹了Java mysql數(shù)據(jù)庫(kù)并進(jìn)行內(nèi)容查詢實(shí)例代碼的相關(guān)資料,需要的朋友可以參考下2016-11-11
Java獲取Prometheus監(jiān)控?cái)?shù)據(jù)的方法實(shí)現(xiàn)
本文主要介紹了Java獲取Prometheus監(jiān)控?cái)?shù)據(jù)的方法實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-12-12
springcloud之自定義簡(jiǎn)易消費(fèi)服務(wù)組件
這篇文章主要介紹了springcloud之自定義簡(jiǎn)易消費(fèi)服務(wù)組件,本篇來(lái)使用rest+ribbon消費(fèi)服務(wù),并且通過(guò)輪詢方式來(lái)自定義了個(gè)簡(jiǎn)易消費(fèi)組件,感興趣的小伙伴們可以參考一下2018-06-06
Java開(kāi)發(fā)SpringBoot集成接口文檔實(shí)現(xiàn)示例
這篇文章主要為大家介紹了Java開(kāi)發(fā)SpringBoot如何集成接口文檔的實(shí)現(xiàn)示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步2021-10-10
springboot+mybatis攔截器方法實(shí)現(xiàn)水平分表操作
這篇文章主要介紹了springboot+mybatis攔截器方法實(shí)現(xiàn)水平分表操作,文章圍繞主題展開(kāi)詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的朋友可以參考一下2022-08-08
Java調(diào)用Docx4j庫(kù)玩轉(zhuǎn)Word文檔處理
在 Java 開(kāi)發(fā)里處理 Word 文檔時(shí),Docx4j 可是個(gè)超厲害的庫(kù),它能讓咱輕松創(chuàng)建,讀取,修改和轉(zhuǎn)換 Word 文檔,下面我們就來(lái)看看具體是如何操作的吧2025-02-02
Java使用正則表達(dá)式提取XML節(jié)點(diǎn)內(nèi)容的方法示例
這篇文章主要介紹了Java使用正則表達(dá)式提取XML節(jié)點(diǎn)內(nèi)容的方法,結(jié)合具體實(shí)例形式分析了java針對(duì)xml格式字符串的正則匹配相關(guān)操作技巧,需要的朋友可以參考下2017-08-08

