java高并發(fā)下解決AtomicLong性能瓶頸方案LongAdder
一、 LongAdder簡介
LongAdder類是JDK1.8新增的一個原子性操作類。上一節(jié)說到,AtomicLong通過CAS提供了非阻塞的原子性操作,相比用阻塞算法的synchronized來說性能已經(jīng)得到了很大提升。在高并發(fā)下大量線程會同時競爭更新同一個原子變量,但由于只有一個線程的CAS操作會成功,這就造成了大量線程競爭失敗后,會通過無限循環(huán)不斷進行自旋嘗試CAS操作,這會白白浪費CPU資源。
為了解決AtomicLong在高并發(fā)下的缺點,LongAdder應運而生。LongAdder采用的思路是:既然AtomicLong由于過多線程同時去競爭同一個變量的更新而產(chǎn)生性能瓶頸,那么把一個變量分解為多個變量,讓同樣多的線程去競爭多個資源,就解決了性能問題。
如圖4-1:

使用AtomicLong時,是多個線程同時競爭同一個原子變量。
如圖4-2:

使用LongAdder時,則是內(nèi)部維護多個Cell變量,每個Cell里面有一個初始值為0的long型變量,在同等并發(fā)量的情況下,爭奪單個變量的線程會減少,這是變相地減少了爭奪共享資源的并發(fā)量。另外,多個線程在爭奪同一個Cell原子變量時候,如果失敗并不是自旋CAS重試,而是嘗試獲取在其他Cell原子變量上進行CAS嘗試,這增加了當前線程重試CAS成功的可能性。最后,在獲取LongAdder當前值時,是把所有Cell變量的value值累加后再加上base值返回的。
LongAdder維護了一個Cells數(shù)組和一個基值變量base,Cells數(shù)組的特點如下:
- 延遲初始化(默認情況下Cells為null)。因為Cells占用內(nèi)存相對較大,故惰性加載。Cells為null時且并發(fā)線程較少時,所有的累加操作都是對base變量進行的。
- 初始化時,Cells數(shù)組中Cell元素個數(shù)為2;同時,Cell數(shù)組中元素個數(shù)保存為2的N次方。
Cell 類是AtomicLong的一個改進,用來減少緩存的爭用,即解決偽共享問題。對于大多數(shù)孤立的多個原子操作進行字節(jié)填充是浪費的,因為原子操作都是無規(guī)律地分散在內(nèi)存中進行的,多個原子變量被放入同一個緩存行的可能性很小。但是原子性數(shù)組元素的內(nèi)存地址是連續(xù)的,故數(shù)組內(nèi)的多個元素能經(jīng)常共享緩存行(偽共享),因此Cell類使用了@sun.misc.Contended注解進行字節(jié)填充,這是為了防止數(shù)組中多個元素共享一個緩存行,從而提升性能。
二、LongAdder代碼分析
為了解決高并發(fā)下多線程對一個變量 CAS 爭奪失敗后進行無限自旋而造成的降低并發(fā)性能的問題,LongAdder在內(nèi)部維護了一個動態(tài)的Cell數(shù)組來分擔對單個變量進行爭奪的開銷。
這一節(jié)我們來圍繞以下話題對LongAdder的實現(xiàn)進行分析:
- (1)LongAdder的結構
- (2)當前線程應該訪問Cells數(shù)組里的哪一個Cell元素
- (3)如何初始化Cells數(shù)組
- (4)Cells數(shù)組的擴容機制
- (5)線程訪問所分配的Cell元素有沖突后如何處理
- (6)如何保證線程操作被分配的Cell元素的原子性
(1)LongAdder的結構
如圖,LongAdder類繼承自Striped64類:

先混個眼熟,Striped64類是一個高并發(fā)累加的工具類。其特點為:
- 設計核心思路就是通過內(nèi)部的分散計算來避免競爭。
- 內(nèi)部包含一個base和一個Cells數(shù)組
- 沒有競爭的情況下,要累加的數(shù)通過CAS累加到base上;如果有競爭的話,會將Cells數(shù)組中的每個Cell的value值累加,再加上base值返回。
Striped64類內(nèi)部維護三個重要的變量:
- transient volatile Cell[] cells; // 存放Cell的數(shù)組,大小為2的N次方
- transient volatile long base; // 基礎值,默認為0
- transient volatile int cellsBusy; // 用來實現(xiàn)自旋鎖,狀態(tài)值只有0和1,通過CAS操作該變量來保證只有一個線程可以創(chuàng)建Cell元素、初始化Cells數(shù)組、對Cells數(shù)組擴容
上文中出現(xiàn)頻率很高的Cell長啥樣?下面我們來揭秘一下。
Cell結構如下:
? @sun.misc.Contended
static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
簡單分析一下:
- 為提高性能,使用注解@sun.misc.Contended修飾,用來避免偽共享。
- value變量被聲明為volatile,為了保證變量的內(nèi)存可見性。
- cas方法通過CAS操作,保證了當前線程更新時被分配的Cell元素中value值的原子性。
(2)add方法實現(xiàn)
從add方法的代碼中,我們可以找到開頭里很多問題的答案。
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {//(1)
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 || //(2)
(a = as[getProbe() & m]) == null ||//(3)
!(uncontended = a.cas(v = a.value, v + x)))//(4)
longAccumulate(x, null, uncontended);//(5)
}
}
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
- (1)處代碼表明:如果cells為null,則add方法實現(xiàn)的效果是在base變量上累加x,這是與AtomicLong的操作類似。
- 當cells不為null或者線程執(zhí)行代碼(1)處的CAS操作失敗了,就會執(zhí)行代碼(2);代碼(2)(3)決定了當前線程應該訪問Cells數(shù)組里哪一個Cell元素,如果當前線程映射的Cell元素存在則執(zhí)行代碼(4),使用CAS操作去更新Cell元素的value值
- 如果當前線程映射的Cell元素不存在或者存在,但是CAS操作失敗則執(zhí)行代碼(5)。
總結來看就是,(2)(3)(4)處的代碼就是獲取當前線程應該訪問的Cells數(shù)組中的Cell元素,然后進行CAS更新操作,在獲取期間如果有條件不滿足就會跳到代碼(5)執(zhí)行。
另外,當前線程應該訪問Cells數(shù)組的哪一個Cell元素是通過getProbe() & m 進行計算的,其中m是當前Cells數(shù)組元素個數(shù)-1,getProbe()則用于獲取當前線程中變量threadLocalRandomProbe的值(默認為0,代碼(5)將其初始化),并且當前線程通過分配的Cell元素的cas函數(shù)來保證對Cell元素更新的原子性。到此,我們解決了問題當前線程應該訪問Cells數(shù)組里的哪一個Cell元素和如何保證線程操作被分配的Cell元素的原子性。
(3)add方法中l(wèi)ongAccumulate方法的實現(xiàn)
longAccumulate方法是cells數(shù)組被初始化和擴容的地方。
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;//(6)
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {//(7)
if ((a = as[(n - 1) & h]) == null) {//(8)
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//(9)
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
//(10)
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
//(11)
else if (!collide)
collide = true;
//(12)
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
//(12.1)
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
//(12.2)
cellsBusy = 0;
}
//(12.3)
collide = false;
continue; // Retry with expanded table
}
//(13)為了能找到一個空閑的Cell,重新計算hash值,xorshift算法生成隨機數(shù)。
h = advanceProbe(h);
}
//(14)
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
//(14.1)
Cell[] rs = new Cell[2];
//(14.2)
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
//(14.3)
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
- 當線程第一次執(zhí)行代碼(6)時,會初始化當前線程變量threadLocalRandomProbe的值,此變量在計算當前線程被分配到Cells數(shù)組的哪一個Cell元素時會使用。
- 當前線程執(zhí)行到代碼(7)(8)時,會根據(jù)當前線程的threadLocalRandomProbe和cells元素個數(shù)計算訪問的Cell元素下標,如果發(fā)現(xiàn)對應下標元素的值為null,則新增一個Cell元素添加到Cells數(shù)組中。但在添加的時候,需要競爭并設置cellsBusy為1。
- 當前線程對應的Cell元素存在,執(zhí)行代碼(9),進行累加操作。
如果Cells數(shù)組不存在時,會發(fā)生什么?
- cells數(shù)組初始化是在代碼(14)中進行的,cellsBusy是一個標識(0:當前數(shù)組沒有在被初始化或者擴容,也未新建Cell元素。1:cells數(shù)組在被初始化或擴容、正在創(chuàng)建新的cell元素),通過CAS操作來進行0或1狀態(tài)的切換,使用
casCellsBusy方法。 - (14.1)初始化cells數(shù)組,長度為2,然后(14.2)使用當前線程的threadLocalRandomProbe值&(cells元素個數(shù)-1)來計算當前線程應該訪問cells數(shù)組的哪個位置。最后(14.3)重置cellsBusy為0,表示初始化完成。這里雖未使用CAS,但卻線程安全,因為cellsBusy是volatile的,保證了內(nèi)存可見性。
- 初始化完的cells數(shù)組,里面的元素還是為null的。創(chuàng)建Cell元素是在(7)(8)中。
Cells數(shù)組擴容是怎么實現(xiàn)的?
- 代碼(12)中,擴容之前需要進行(10)和(11)的判斷,當cells元素個數(shù)小于當前機器CPU個數(shù)且當前多個線程訪問了cells中同一個元素時,從而導致沖突使其中一個線程CAS失敗時才會進行擴容操作。
- 涉及CPU個數(shù)的原因是,只有每個CPU都運行一個線程時,多線程效果最佳。即當Cells元素個數(shù)等于CPU個數(shù)時,每個Cell都使用一個CPU進行處理,效果最佳。
- 代碼(12)中,擴容操作是先通過CAS設置cellsBusy為1,然后才擴容。假設CAS成功則執(zhí)行代碼(12.1)擴容,容量為之前的2倍,并復制Cell元素到擴容后數(shù)組,并復制Cell元素到擴容后的數(shù)組。
線程訪問所分配的Cell元素有沖突后如何處理?
- 代碼(13)已經(jīng)給了我們答案,對CAS失敗的線程重新計算當前線程的threadLocalRandomProbe值,以減少下次訪問cells元素時的沖突機會。
三、總結
最后再問一下自己,看完了源碼分析后,能回答出這六個問題嗎?如果能,那么恭喜你,LongAdder的原理已經(jīng)掌握得差不多了。
- (1)LongAdder的結構
- (2)當前線程應該訪問Cells數(shù)組里的哪一個Cell元素
- (3)如何初始化Cells數(shù)組
- (4)Cells數(shù)組的擴容機制
- (5)線程訪問所分配的Cell元素有沖突后如何處理
- (6)如何保證線程操作被分配的Cell元素的原子性
由于篇幅有限,下一期我們再來看看LongAccumulator類的原理,畢竟這期的內(nèi)容有點多了,講太多也不好消化。這期就到這吧~更多關于java高并發(fā)AtomicLong LongAdder的資料請關注腳本之家其它相關文章!
相關文章
Java實戰(zhàn)之晚會抽獎系統(tǒng)的實現(xiàn)
這篇文章主要介紹了如何利用Java語言編寫一個晚會抽獎系統(tǒng),文中采用到的技術有Jdbc、Servlert、JavaScript、JQuery、Ajax等,感興趣的可以學習一下2022-03-03
基于SpringBoot應用監(jiān)控Actuator安全隱患及解決方式
這篇文章主要介紹了SpringBoot應用監(jiān)控Actuator安全隱患及解決方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07

