JUC并發(fā)編程LinkedBlockingQueue隊列深入分析源碼
LinkedBlockingQueue介紹
在JUC包下關(guān)于線程安全的隊列實現(xiàn)有很多,那么此篇文章講解LinkedBlockingQueue的實現(xiàn)原理,相信各位讀者在線程池中能看到LinkedBlockingQueue或者SynchronousQueue隊列來作為儲存任務(wù)和消費任務(wù)的通道。一個并發(fā)安全的隊列,在多線程中充當著安全的傳輸任務(wù)的責(zé)任。
既然是介紹LinkedBlockingQueue,那么從構(gòu)造方法入手最合適不過。
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
// 初始化一個偽節(jié)點,讓head和last都指向這個偽節(jié)點
// 為什么需要偽節(jié)點的存在?
// 因為可以保證不會發(fā)生極端情況(假設(shè)沒有偽節(jié)點,并且只存在一個節(jié)點的情況下,生產(chǎn)者和消費者并發(fā)執(zhí)行就可能出現(xiàn)極端情況)
last = head = new Node<E>(null);
}
為什么需要存在偽節(jié)點,因為可以保證不會發(fā)生極端情況(假設(shè)沒有偽節(jié)點,并且只存在一個節(jié)點的情況下,生產(chǎn)者和消費者并發(fā)執(zhí)行就可能出現(xiàn)極端情況,用偽節(jié)點就能很好的解決這個極端問題)
/** * 因為是隊列,用鏈表實現(xiàn),所以頭尾指針肯定不可少。 * */ transient Node<E> head; private transient Node<E> last; /** * 我們可以很清楚的看到,這里使用了2套ReentrantLock和對應(yīng)的condition條件等待隊列。 * 目的也很明顯,讓生產(chǎn)者和消費者并行。 * */ private final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondition();
2套ReentrantLock和對應(yīng)的condition條件等待隊列,很明顯目的是為了讓生產(chǎn)者和消費者并行,所以就需要一個偽節(jié)點處理極端并發(fā)情況。
為了,一些沒有接觸過隊列的讀者,所以這里還是介紹一下API把
| API | 用途 | 注意事項 |
| offer | 生產(chǎn)者 | 不會阻塞,如果插入失敗,或者隊列已經(jīng)滿了,直接返回 |
| poll | 消費者 | 不會阻塞,如果消費失敗,或者隊列當前為空,直接返回 |
| put | 生產(chǎn)者 | 會阻塞,如果插入失敗或者隊列已經(jīng)滿了,阻塞直到插入成功 |
| take | 消費者 | 會阻塞,如果消費失敗或者當前隊列為空,阻塞直到消費成功 |
put方法-生產(chǎn)者
public void put(E e) throws InterruptedException {
// 不能插入null
if (e == null) throw new NullPointerException();
int c = -1;
// 創(chuàng)建插入的節(jié)點。
Node<E> node = new Node<E>(e);
// 拿到生產(chǎn)者的鎖對象
final ReentrantLock putLock = this.putLock;
// 拿到全局計數(shù)器,注意這里用的是AtomicInteger,所以自增的原子性已經(jīng)保證。
final AtomicInteger count = this.count;
// 上的是可響應(yīng)中斷鎖。
putLock.lockInterruptibly();
try {
// 如果當前隊列已經(jīng)滿了,此時我們就要去阻塞,等待隊列被消費,我們要被喚醒,醒來生產(chǎn)節(jié)點。
while (count.get() == capacity) {
// 進入條件等待隊列阻塞。
// 注意,只要阻塞,是會釋放鎖的,其他生產(chǎn)者線程可以搶到鎖。
notFull.await();
}
// 插入到隊列尾部
enqueue(node);
// 因為插入了節(jié)點,所以全局計數(shù)需要+1
// 但是這里請注意細節(jié),getAndIncrement方法返回的是舊值。
c = count.getAndIncrement();
// 這里是一個很sao的點
// 注意,這里只要當前隊列沒滿,喚醒的是生產(chǎn)者的條件等待隊列。
// 為什么要這么做?
// 很簡單,首先需要考慮,生產(chǎn)者和消費者是并發(fā)執(zhí)行了。
// 其次,只要隊列沒滿就能一直生產(chǎn),那么隊列一旦滿了后,后來的線程就都去條件隊列阻塞,所以線程生產(chǎn)完一個節(jié)點就有必要去喚醒等待的同胞(不管有沒有同胞在阻塞,這是義務(wù))
if (c + 1 < capacity)
// 喚醒條件等待隊列中頭部節(jié)點。
notFull.signal();
} finally {
putLock.unlock();
}
// 這里也是一個很sao的點
// 再次強調(diào),getAndIncrement方法是返回的舊值
// 所以當前生產(chǎn)者如果生產(chǎn)的是第一個節(jié)點,那么c ==0
// 而隊列中沒有節(jié)點,消費者是要阻塞的
// 也即,這里給隊列生產(chǎn)了一個節(jié)點,要喚醒消費者去消費節(jié)點。
if (c == 0)
signalNotEmpty();
}
// 插入到隊列尾部
// 因為ReentrantLock保證了整體的原子性,所以這里細節(jié)部分不需要保證原子性了。
private void enqueue(Node<E> node) {
// 插入到尾部
last = last.next = node;
}第一次看到這個代碼難免會發(fā)生震撼,為什么在生產(chǎn)者代碼里面喚醒生產(chǎn)者?不是正常寫的生產(chǎn)者消費者模型,不都是生產(chǎn)者生產(chǎn)一個喚醒消費者消費嗎?怎么這里不一樣??????
因為這里生產(chǎn)者和消費者并行處理,當隊列滿了以后,后來的生產(chǎn)者線程都會去阻塞,所以生產(chǎn)者線程生產(chǎn)完一個節(jié)點就有必要去喚醒等待的同胞(不管有沒有同胞在阻塞,這是義務(wù))
大致流程如下:
- 創(chuàng)建Node節(jié)點
- 上生產(chǎn)者鎖
- 如果隊列已經(jīng)滿了,就去生產(chǎn)者條件隊列阻塞
- 如果沒滿,或者喚醒后,就插入到last指針的后面
- 全局節(jié)點計數(shù)器+1
- 如果當前隊列還有空間,就喚醒在阻塞的同胞。
- 釋放鎖
- 如果在生產(chǎn)之前隊列為空,本次生產(chǎn)后就需要喚醒在阻塞的消費者線程,讓他們醒來消費我剛生產(chǎn)的節(jié)點
take方法-消費者
public E take() throws InterruptedException {
E x;
int c = -1;
// 全局計數(shù)器
final AtomicInteger count = this.count;
// 消費者的鎖對象
final ReentrantLock takeLock = this.takeLock;
// 可響應(yīng)中斷鎖。
takeLock.lockInterruptibly();
try {
// 如果當前隊列中沒有節(jié)點,此時消費者需要去阻塞,因為不阻塞他只會浪費CPU性能,又消費不到節(jié)點。
while (count.get() == 0) {
// 去消費者的條件隊列阻塞。
notEmpty.await();
}
// 醒來后,去消費節(jié)點。
x = dequeue();
// 給全局計數(shù)器-1,但是這里也要注意,返回的是舊值
c = count.getAndDecrement();
// 如果隊列中還有節(jié)點就喚醒其他消費者去消費節(jié)點。
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 這里也是一個sao點
// 請注意,這里的c是舊值,因為getAndDecrement返回的是舊值
// 所以,如果當前消費線程消費節(jié)點之前隊列是滿的,當消費完畢后,我有必要去喚醒因為隊列滿了而阻塞等待的生產(chǎn)者,因為當前已經(jīng)空出一個空間了。
if (c == capacity)
// 喚醒生產(chǎn)者
signalNotFull();
return x;
}
// 消費者消費節(jié)點
// 所以需要HelpGC
// 不過這里要注意,head都是指向偽節(jié)點。
private E dequeue() {
// 拿到頭節(jié)點,
Node<E> h = head;
// 拿到頭節(jié)點的next節(jié)點,next節(jié)點作為下一個head節(jié)點。
// 因為head節(jié)點是指向偽節(jié)點,所以head.next節(jié)點就是當前要消費的節(jié)點。
Node<E> first = h.next;
// 將當前的頭結(jié)點的next指向自己。
h.next = h; // help GC
// 設(shè)置新的頭結(jié)點,也即把當前消費的節(jié)點做為下次的偽節(jié)點
// head節(jié)點指向的都是偽節(jié)點
head = first;
// 拿到當前消費者想要的數(shù)據(jù)
E x = first.item;
first.item = null;
return x;
}這里跟put生產(chǎn)者基本思想一致,只不過這里是消費者,因為是生產(chǎn)者消費者并行,所以這里也是喚醒同胞,因為當隊列為空所有的消費者都會阻塞,所以每次消費者線程消費完節(jié)點后 ,有義務(wù)喚醒同胞。
大致流程如下:
- 拿到全局計數(shù)器
- 上消費者鎖
- 如果當前隊列為空,當前消費者線程就要去阻塞
- 如果不為空,或者被喚醒以后消費節(jié)點,把消費的節(jié)點作為下一次的偽節(jié)點,也即作為head節(jié)點
- 全局計數(shù)器-1
- 喚醒同胞
- 釋放鎖
- 如果在消費之前隊列已經(jīng)滿了,那么可能會有生產(chǎn)者線程在阻塞,所以我有義務(wù)去喚醒他們
總結(jié)

尾插頭拿,生產(chǎn)者和消費者并行執(zhí)行,隊列滿了生產(chǎn)者阻塞,隊列為空消費者阻塞。消費者有義務(wù)喚醒生產(chǎn)者,生產(chǎn)者有義務(wù)喚醒消費者。
到此這篇關(guān)于JUC并發(fā)編程LinkedBlockingQueue隊列深入分析源碼的文章就介紹到這了,更多相關(guān)JUC LinkedBlockingQueue內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
基于apache poi根據(jù)模板導(dǎo)出excel的實現(xiàn)方法
下面小編就為大家?guī)硪黄赼pache poi根據(jù)模板導(dǎo)出excel的實現(xiàn)方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-06-06
Java中instanceof關(guān)鍵字的用法總結(jié)
instanceof是Java的一個二元操作符,和==,>,<是同一類東東。由于它是由字母組成的,所以也是Java的保留關(guān)鍵字。它的作用是測試它左邊的對象是否是它右邊的類的實例,返回boolean類型的數(shù)據(jù)2013-10-10
SpringBoot整合阿里云OSS對象存儲服務(wù)實現(xiàn)文件上傳
這篇文章主要介紹了SpringBoot整合阿里云OSS對象存儲實現(xiàn)文件上傳,幫助大家更好的理解和學(xué)習(xí)使用SpringBoot,感興趣的朋友可以了解下2021-04-04
關(guān)于Java的Condition接口最佳理解方式
這篇文章主要介紹了關(guān)于Java的Condition接口最佳理解方式,Condition就是實現(xiàn)了管程里面的條件變量,Java?語言內(nèi)置的管程里只有一個條件變量,而Lock&Condition實現(xiàn)的管程支持多個條件變量,需要的朋友可以參考下2023-05-05
Spring Native實現(xiàn)0.059s啟動一個SpringBoot項目
Spring Native是Spring框架的一個子項目,旨在提供一種將Spring應(yīng)用程序編譯為本地可執(zhí)行文件的方法,從而提高啟動時間和資源效率,本文主要介紹了Spring Native實現(xiàn)0.059s啟動一個SpringBoot項目,感興趣的可以了解一下2024-02-02
Java編程之jdk1.4,jdk1.5和jdk1.6的區(qū)別分析(經(jīng)典)
這篇文章主要介紹了Java編程之jdk1.4,jdk1.5和jdk1.6的區(qū)別分析,結(jié)合實例形式較為詳細的分析說明了jdk1.4,jdk1.5和jdk1.6版本的使用區(qū)別,需要的朋友可以參考下2015-12-12

