Java并發(fā)編程中的ConcurrentLinkedQueue詳解
ConcurrentLinkedQueue 詳解
ConcurrentLinkedQueue示例 下面通過一個示例來了解ConcurrentLinkedQueue的使用
import java.util.concurrent.ConcurrentLinkedQueue;
class PutThread extends Thread {
private ConcurrentLinkedQueue<Integer> clq;
public PutThread(ConcurrentLinkedQueue<Integer> clq) {
this.clq = clq;
}
public void run() {
for (int i = 0; i < 10; i++) {
try {
System.out.println("add " + i);
clq.add(i);
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class GetThread extends Thread {
private ConcurrentLinkedQueue<Integer> clq;
public GetThread(ConcurrentLinkedQueue<Integer> clq) {
this.clq = clq;
}
public void run() {
for (int i = 0; i < 10; i++) {
try {
System.out.println("poll " + clq.poll());
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class ConcurrentLinkedQueueDemo {
public static void main(String[] args) {
ConcurrentLinkedQueue<Integer> clq = new ConcurrentLinkedQueue<Integer>();
PutThread p1 = new PutThread(clq);
GetThread g1 = new GetThread(clq);
p1.start();
g1.start();
}
}運行結(jié)果: add 0
poll null
add 1
poll 0
add 2
poll 1
add 3
poll 2
add 4
poll 3
add 5
poll 4
poll 5
add 6
add 7
poll 6
poll 7
add 8
add 9
poll 8
說明: GetThread線程不會因為ConcurrentLinkedQueue隊列為空而等待,而是直接返回null,所以當(dāng)實現(xiàn)隊列不空時,等待時,則需要用戶自己實現(xiàn)等待邏輯。
ConcurrentLinkedQueue數(shù)據(jù)結(jié)構(gòu) ConcurrentLinkedQueue的數(shù)據(jù)結(jié)構(gòu)與LinkedBlockingQueue的數(shù)據(jù)結(jié)構(gòu)相同,都是使用的鏈表結(jié)構(gòu)。ConcurrentLinkedQueue的數(shù)據(jù)結(jié)構(gòu)如下:

ConcurrentLinkedQueue采用的鏈表結(jié)構(gòu),并且包含有一個頭節(jié)點和一個尾結(jié)點
核心函數(shù)分析
offer函數(shù)
public boolean offer(E e) {
// 元素不為null
checkNotNull(e);
// 新生一個結(jié)點
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) { // 無限循環(huán)
// q為p結(jié)點的下一個結(jié)點
Node<E> q = p.next;
if (q == null) { // q結(jié)點為null
// p is last node
if (p.casNext(null, newNode)) { // 比較并進(jìn)行替換p結(jié)點的next域
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // p不等于t結(jié)點,不一致 // hop two nodes at a time
// 比較并替換尾結(jié)點
casTail(t, newNode); // Failure is OK.
// 返回
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q) // p結(jié)點等于q結(jié)點
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
// 原來的尾結(jié)點與現(xiàn)在的尾結(jié)點是否相等,若相等,則p賦值為head,否則,賦值為現(xiàn)在的尾結(jié)點
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
// 重新賦值p結(jié)點
p = (p != t && t != (t = tail)) ? t : q;
}
}offer函數(shù)用于將指定元素插入此隊列的尾部。下面模擬offer函數(shù)的操作,隊列狀態(tài)的變化(假設(shè)單線程添加元素,連續(xù)添加10、20兩個元素)。

若ConcurrentLinkedQueue的初始狀態(tài)如上圖所示,即隊列為空。單線程添加元素,此時,添加元素10,則狀態(tài)如下所示

如上圖所示,添加元素10后,tail沒有變化,還是指向之前的結(jié)點,繼續(xù)添加元素20,則狀態(tài)如下所示

如上圖所示,添加元素20后,tail指向了最新添加的結(jié)點。
poll函數(shù)
public E poll() {
restartFromHead:
for (;;) { // 無限循環(huán)
for (Node<E> h = head, p = h, q;;) { // 保存頭節(jié)點
// item項
E item = p.item;
if (item != null && p.casItem(item, null)) { // item不為null并且比較并替換item成功
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // p不等于h // hop two nodes at a time
// 更新頭節(jié)點
updateHead(h, ((q = p.next) != null) ? q : p);
// 返回item
return item;
}
else if ((q = p.next) == null) { // q結(jié)點為null
// 更新頭節(jié)點
updateHead(h, p);
return null;
}
else if (p == q) // p等于q
// 繼續(xù)循環(huán)
continue restartFromHead;
else
// p賦值為q
p = q;
}
}
}此函數(shù)用于獲取并移除此隊列的頭,如果此隊列為空,則返回null。下面模擬poll函數(shù)的操作,隊列狀態(tài)的變化(假設(shè)單線程操作,狀態(tài)為之前offer10、20后的狀態(tài),poll兩次)。

隊列初始狀態(tài)如上圖所示,在poll操作后,隊列的狀態(tài)如下圖所示

如上圖可知,poll操作后,head改變了,并且head所指向的結(jié)點的item變?yōu)榱薾ull。再進(jìn)行一次poll操作,隊列的狀態(tài)如下圖所示。

如上圖可知,poll操作后,head結(jié)點沒有變化,只是指示的結(jié)點的item域變成了null。
remove函數(shù)
public boolean remove(Object o) {
// 元素為null,返回
if (o == null) return false;
Node<E> pred = null;
for (Node<E> p = first(); p != null; p = succ(p)) { // 獲取第一個存活的結(jié)點
// 第一個存活結(jié)點的item值
E item = p.item;
if (item != null &&
o.equals(item) &&
p.casItem(item, null)) { // 找到item相等的結(jié)點,并且將該結(jié)點的item設(shè)置為null
// p的后繼結(jié)點
Node<E> next = succ(p);
if (pred != null && next != null) // pred不為null并且next不為null
// 比較并替換next域
pred.casNext(p, next);
return true;
}
// pred賦值為p
pred = p;
}
return false;
}此函數(shù)用于從隊列中移除指定元素的單個實例(如果存在)。其中,會調(diào)用到first函數(shù)和succ函數(shù),first函數(shù)的源碼如下
Node<E> first() {
restartFromHead:
for (;;) { // 無限循環(huán),確保成功
for (Node<E> h = head, p = h, q;;) {
// p結(jié)點的item域是否為null
boolean hasItem = (p.item != null);
if (hasItem || (q = p.next) == null) { // item不為null或者next域為null
// 更新頭節(jié)點
updateHead(h, p);
// 返回結(jié)點
return hasItem ? p : null;
}
else if (p == q) // p等于q
// 繼續(xù)從頭節(jié)點開始
continue restartFromHead;
else
// p賦值為q
p = q;
}
}
}first函數(shù)用于找到鏈表中第一個存活的結(jié)點。succ函數(shù)源碼如下
final Node<E> succ(Node<E> p) {
// p結(jié)點的next域
Node<E> next = p.next;
// 如果next域為自身,則返回頭節(jié)點,否則,返回next
return (p == next) ? head : next;
}succ用于獲取結(jié)點的下一個結(jié)點。如果結(jié)點的next域指向自身,則返回head頭節(jié)點,否則,返回next結(jié)點。下面模擬remove函數(shù)的操作,隊列狀態(tài)的變化(假設(shè)單線程操作,狀態(tài)為之前offer10、20后的狀態(tài),執(zhí)行remove(10)、remove(20)操作)。

如上圖所示,為ConcurrentLinkedQueue的初始狀態(tài),remove(10)后的狀態(tài)如下圖所示

如上圖所示,當(dāng)執(zhí)行remove(10)后,head指向了head結(jié)點之前指向的結(jié)點的下一個結(jié)點,并且head結(jié)點的item域置為null。繼續(xù)執(zhí)行remove(20),狀態(tài)如下圖所示

如上圖所示,執(zhí)行remove(20)后,head與tail指向同一個結(jié)點,item域為null。
size函數(shù)
public int size() {
// 計數(shù)
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p)) // 從第一個存活的結(jié)點開始往后遍歷
if (p.item != null) // 結(jié)點的item域不為null
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE) // 增加計數(shù),若達(dá)到最大值,則跳出循環(huán)
break;
// 返回大小
return count;
}此函數(shù)用于返回ConcurrenLinkedQueue的大小,從第一個存活的結(jié)點(first)開始,往后遍歷鏈表,當(dāng)結(jié)點的item域不為null時,增加計數(shù),之后返回大小。
HOPS(延遲更新的策略)的設(shè)計 通過上面對offer和poll方法的分析,我們發(fā)現(xiàn)tail和head是延遲更新的,兩者更新觸發(fā)時機(jī)為:
tail更新觸發(fā)時機(jī):當(dāng)tail指向的節(jié)點的下一個節(jié)點不為null的時候,會執(zhí)行定位隊列真正的隊尾節(jié)點的操作,找到隊尾節(jié)點后完成插入之后才會通過casTail進(jìn)行tail更新;當(dāng)tail指向的節(jié)點的下一個節(jié)點為null的時候,只插入節(jié)點不更新tail。
head更新觸發(fā)時機(jī):當(dāng)head指向的節(jié)點的item域為null的時候,會執(zhí)行定位隊列真正的隊頭節(jié)點的操作,找到隊頭節(jié)點后完成刪除之后才會通過updateHead進(jìn)行head更新;當(dāng)head指向的節(jié)點的item域不為null的時候,只刪除節(jié)點不更新head。
并且在更新操作時,源碼中會有注釋為:hop two nodes at a time。所以這種延遲更新的策略就被叫做HOPS的大概原因是這個(猜的),從上面更新時的狀態(tài)圖可以看出,head和tail的更新是“跳著的”即中間總是間隔了一個。那么這樣設(shè)計的意圖是什么呢?
如果讓tail永遠(yuǎn)作為隊列的隊尾節(jié)點,實現(xiàn)的代碼量會更少,而且邏輯更易懂。但是,這樣做有一個缺點,如果大量的入隊操作,每次都要執(zhí)行CAS進(jìn)行tail的更新,匯總起來對性能也會是大大的損耗。如果能減少CAS更新的操作,無疑可以大大提升入隊的操作效率,所以doug lea大師每間隔1次(tail和隊尾節(jié)點的距離為1)進(jìn)行才利用CAS更新tail。對head的更新也是同樣的道理,雖然,這樣設(shè)計會多出在循環(huán)中定位隊尾節(jié)點,但總體來說讀的操作效率要遠(yuǎn)遠(yuǎn)高于寫的性能,因此,多出來的在循環(huán)中定位尾節(jié)點的操作的性能損耗相對而言是很小的。
ConcurrentLinkedQueue適合的場景
ConcurrentLinkedQueue通過無鎖來做到了更高并發(fā)量,是個高性能的隊列,但是使用場景相對不如阻塞隊列常見,畢竟取數(shù)據(jù)也要不停的去循環(huán),不如阻塞的邏輯好設(shè)計,但是在并發(fā)量特別大的情況下,是個不錯的選擇,性能上好很多,而且這個隊列的設(shè)計也是特別費力,尤其的使用的改良算法和對哨兵的處理。
整體的思路都是比較嚴(yán)謹(jǐn)?shù)?,這個也是使用了無鎖造成的,我們自己使用無鎖的條件的話,這個隊列是個不錯的參考。
到此這篇關(guān)于Java并發(fā)編程中的ConcurrentLinkedQueue詳解的文章就介紹到這了,更多相關(guān)ConcurrentLinkedQueue詳解內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java與JSON數(shù)據(jù)的轉(zhuǎn)換實例詳解
這篇文章主要介紹了java與JSON數(shù)據(jù)的轉(zhuǎn)換實例詳解的相關(guān)資料,需要的朋友可以參考下2017-03-03
Java 數(shù)據(jù)結(jié)構(gòu)與算法系列精講之背包問題
背包問題是一個非常典型的考察動態(tài)規(guī)劃應(yīng)用的題目,對其加上不同的限制和條件,可以衍生出諸多變種,若要全面理解動態(tài)規(guī)劃,就必須對背包問題了如指掌2022-02-02
手把手帶你分析SpringBoot自動裝配完成了Ribbon哪些核心操作
這篇文章主要介紹了詳解Spring Boot自動裝配Ribbon哪些核心操作的哪些操作,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-08-08

