volatile的原理和記憶體屏障參考《Java並行程式設計的藝術》
原子類原始碼基於JDK8
volatile
修飾的欄位,Java執行緒模型保證所有執行緒看到這個變數值是一致的。
volatile
修飾的變數執行寫操作的時候多出lock
字首指令的程式碼,lock字首的指令會導致
為了提高處理速度,處理器不直接和記憶體通訊,而是先把系統記憶體的資料讀到內部快取後繼續操作,但是操作完不知道何時寫回記憶體。如果對volatile
修飾的變數執行寫操作,將會讓資料寫回到系統記憶體,但是其他執行緒還是使用快取中的舊值,還是會存在問題。所以在多處理器下為了保證每一個處理器快取時一致的,就會實現快取一致性協定,每個處理器通過嗅探匯流排上傳播的資料來檢查自己快取的資料是否過期,如果發現自己快取行中對應的記憶體地址被修改了,就會將當前處理器的快取行設定成無效,當前處理器對這個資料進行修改操作時,會重新從主記憶體拉取最新的資料到快取。
在程式執行時,為了提高效能,處理器和編譯器通常會對指令進行重排序。
為了保證記憶體可見性,Java編譯器在生成指令序列的適當位置會插入記憶體屏障來禁止處理器級別的(指令級別並行重排序,記憶體系統重排序)指令重排序
不同硬體實現記憶體屏障的方式不同,Java記憶體模型遮蔽了這種底層硬體平臺的差異,由JVM來為不同的平臺生成相應的機器碼。
實際使用中,又分為以下四種:
型別 | 解釋 |
---|---|
LoadLoad | 對於Load1,Loadload,Load2 ,確保Load1所要讀入的資料能夠在被Load2和後續的load指令存取前讀入 |
StoreStore | 對於Store1,StoreStore,Store2 確保Store1的資料在Store2以及後續Store指令操作相關資料之前對其它處理器可見(例如向主記憶體重新整理資料)。 |
LoadStore | 對於 Load1; LoadStore; Store2 ,確保Load1的資料在Store2和後續Store指令被重新整理之前讀取 |
StoreLoad | 對於Store1; StoreLoad; Load2 ,確保Store1的資料在被Load2和後續的Load指令讀取之前對其他處理器可見。StoreLoad屏障可以防止一個後續的load指令 不正確的使用了Store1的資料,而不是另一個處理器在相同記憶體位置寫入一個新資料。正因為如此,所以在下面所討論的處理器為了在屏障前讀取同樣記憶體位置存過的資料,必須使用一個StoreLoad屏障將儲存指令和後續的載入指令分開。Storeload屏障在幾乎所有的現代多處理器中都需要使用,但通常它的開銷也是最昂貴的。它們昂貴的部分原因是它們必須關閉通常的略過快取直接從寫緩衝區讀取資料的機制。這可能通過讓一個緩衝區進行充分重新整理(flush),以及其他延遲的方式來實現。 |
JMM為了實現volatile的記憶體語意限制了編譯器重排序和處理器重排序
為了實現volatile的記憶體語意,JMM在volatile讀和寫的時候會插入記憶體屏障
volatile寫的記憶體屏障
這裡的store store 屏障可以保證前面所有普通寫對所有處理器可見,實現了在volatile寫之前寫入快取的最新資料寫回到主記憶體
volatile寫之後的記憶體屏障,避免與後續的volatile讀寫出現重排序,由於虛擬機器器無法判斷volatile寫之後是否需要一個store load屏障,比如在volatile寫之後立即return,為了保證volatile的記憶體語意,JMM十分保守的插入一個store load屏障。
volatile 讀的記憶體屏障
這裡的loadload保證了下面普通讀不可以在volatile讀之前,loadstore保證普通寫不可在volatile之前
即比較並替換,實現並行演演算法時常用到的一種技術。CAS操作包含三個運算元——記憶體位置、預期原值及新值。執行CAS操作的時候,將記憶體位置的值與預期原值比較,如果相匹配,那麼處理器會自動將該位置值更新為新值,否則,處理器不做任何操作。CAS是一條CPU的原子指令(cmpxchg指令),不會造成所謂的資料不一致問題,Unsafe提供的CAS方法(如compareAndSwapXXX)底層實現即為CPU指令cmpxchg。
ABA問題是指在CAS操作時,其他執行緒將變數值A改為了B,但是又被改回了A,等到本執行緒使用期望值A與當前變數進行比較時,發現變數A沒有變,於是CAS就將A值進行了交換操作,但是實際上該值已經被其他執行緒改變過,這與樂觀鎖的設計思想不符合。ABA問題的解決思路是,每次變數更新的時候把變數的版本號加1,那麼A-B-A就會變成A1-B2-A3,只要變數被某一執行緒修改過,改變數對應的版本號就會發生遞增變化,從而解決了ABA問題。
熱點資料更新問題
如果一個資料同時被1000個執行緒更新,那麼存在一個倒黴蛋執行緒自旋1000次才能成功修改,第一個成功的執行緒會導致999個執行緒失敗,999個執行緒必須自旋,依次類推,自旋是消耗CPU資源的,如果一直不成功,那麼會佔用CPU資源。
解決方法:破壞掉for死迴圈,當超過一定時間或者一定次數時,return退出。或者把熱點資料拆分開,最後再彙總
這些問題在後面的原子類程式碼中都有具體的實踐
Java8在java.util.atomic
具有16個類,大致可以分為
AtomicBoolean
AtomicInteger
AtomicLong
AtomicIntegerArray
AtomicLongArray
AtomicReferenceArray
原子更新參照陣列AtomicReference
原子更新參照型別AtomicReferenceFieldUpdater
原子更新參照型別的欄位AtomicMarkableReference
原子更新代標記位的參照型別,可以更新一個布林型別的標記位和參照型別。AtomicStampedReference
原子更新帶有版本號,參照型別,該類把版本和參照型別關聯起來,可以用於原子更新資料和資料的版本號,可以解決CAS出現的ABA問題DoubleAccumulator
Doule型別累加器,支援函數是表示式描述值要如何變化DoubleAdder
Doule型別累加器,支援增大減小多少LongAccumulator
Long型別累加器,支援函數是表示式描述值要如何變化LongAdder
Long型別累加器,支援增大減小多少AtomicBoolean
,AtomicInteger
,AtomicLong
的原理類似,選擇AtomicInteger
看下。
使用 volatile
修飾內部int
型別的value 欄位
private volatile int value;
//value欄位就是用於儲存整形變數的,後續操作也是對這個欄位的CAS操作
volatile
修飾保證了value欄位對所有執行緒的可見性,也保證了對value的修改可以立即重新整理會主記憶體,以及對value的讀取操作也會從主記憶體讀取。
靜態程式碼塊獲取value
對於 AtomicInteger
物件的偏移量
private static final Unsafe unsafe = Unsafe.getUnsafe();
//value欄位偏移量
private static final long valueOffset;
static {
try {
//呼叫Unsafe中的objectFieldOffset 方法獲取value欄位相對的偏移量
//cas操作需要需要知道當前value欄位的地址,
//這個地址是相對AtomicInteger的偏移量,
//知道AtomicInteger的地址再加上偏移就可以直接操作value地址的值了
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
public AtomicInteger(int initialValue) {
value = initialValue;
}
public AtomicInteger() {
//無參構造value 為int 基本型別
}
public final int get() {
return value;
}
public final void set(int newValue) {
value = newValue;
}
這裡沒有進行任何執行緒安全的控制,因為JMM保證了從主記憶體讀取volatile
修飾的變數,和寫入volatile
修飾的變數是原子性的操作
public final int getAndSet(int newValue) {
return unsafe.getAndSetInt(this, valueOffset, newValue);
}
這個方法獲取後賦值value為入參newValue,直接呼叫了Unsafe
的getAndSetInt
方法
public final int getAndSetInt(Object o, long offset, int newValue) {
//記錄CAS修改前的值
int v;
do {
//這裡和unsafe中的普通讀取是存在區別的
//獲取舊值,並賦值給v
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, newValue));
//如果CAS修改失敗,說明存在多個執行緒正在進行修改 那麼一直進行CAS
return v;
}
注意這裡的getIntVolatile
是帶有記憶體屏障的讀取volatile變數,如果這裡使用getInt
也許會導致重排序出現
public final int getAndSetInt(Object o, long offset, int newValue) {
//記錄CAS修改前的值
int v;
//獲取舊值,並賦值給v
v = getInt(o, offset);
do {
//導致這裡的CAS永遠不會成功 因為這裡讀取v 是沒有理解從主記憶體重新整理的
} while (!compareAndSwapInt(o, offset, v, newValue));
//如果CAS修改失敗,說明存在多個執行緒正在進行修改 那麼一直進行CAS
return v;
}
public final boolean compareAndSet(int expect, int update) {
//入參依次是當前物件,value偏移量,期望值,更新目標
//當前物件,value偏移量可以定位到value欄位的地址
//執行CAS操作的時候,將記憶體位置的值與預期原值(expect)比較,
//如果相匹配,那麼處理器會自動將該位置值更新為新值(update),
//否則,處理器不做任何操作
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
//如果CAS更新成功返回true 否則返回false
//這個方法不會嘗試自旋到更新成功位置
}
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
public final int getAndDecrement() {
return unsafe.getAndAddInt(this, valueOffset, -1);
}
還是呼叫的Unsafe
的getAndAddInt
方法
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
public final int decrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
}
public final int addAndGet(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}
這幾個方法式JDK8支援函數式介面後新增的方法
getAndAccumulate
public final int getAndAccumulate(int x,
IntBinaryOperator accumulatorFunction) {
int prev, next;
do {
//舊值
prev = get();
//CAS將設定成的值 呼叫IntBinaryOperator獲取
next = accumulatorFunction.applyAsInt(prev, x);
} while (!compareAndSet(prev, next));
return prev;
}
比如說我要實現增大到舊值的x倍,並且返回舊值,那麼就可以使用
//這裡的2 就是增大兩倍,
int doubleReturnPre = ai.getAndAccumulate(2, (pre, x) -> pre * x);
accumulateAndGet
public final int accumulateAndGet(int x,
IntBinaryOperator accumulatorFunction) {
int prev, next;
do {
prev = get();
next = accumulatorFunction.applyAsInt(prev, x);
} while (!compareAndSet(prev, next));
return next;
}
和getAndAccumulate
不同在於返回是CAS更新成功的值,意味著下面這行程式碼返回的是增大後的值,而不是增大前的值
//這裡的2 就是增大兩倍,
int doubleReturnNew = ai.accumulateAndGet(2, (pre, x) -> pre * x);
updateAndGet
public final int updateAndGet(IntUnaryOperator updateFunction) {
int prev, next;
do {
prev = get();
next = updateFunction.applyAsInt(prev);
} while (!compareAndSet(prev, next));
return next;
}
IntUnaryOperator的applyAsInt只接受一個引數,這裡傳入了當前值,可以在applyAsInt中定義如何更新。updateAndGet返回新值
getAndUpdate
public final int getAndUpdate(IntUnaryOperator updateFunction) {
int prev, next;
do {
prev = get();
next = updateFunction.applyAsInt(prev);
} while (!compareAndSet(prev, next));
return prev;
}
和updateAndGet類似,返回的是舊值
lazySet
public final void lazySet(int newValue) {
unsafe.putOrderedInt(this, valueOffset, newValue);
}
lazySet提供一個store store屏障(在當代系統中是很低成本的操作,或者說沒有操作),但是沒有store load屏障,我的理解是把volatile的寫後store load替換成了store store,Store load屏障可以讓後續的load指令對其他處理器可見,但是需要將其他處理器的快取設定成無效讓它們重新從主層讀取,store store,是保證後續處理器在寫volatile變數的時候可以看見lazyset方法改變的值,但是後續的讀不保證一定可見,但是對於volatile變數的讀自然是會讀到最新值的,從而減少了開銷。lazySet的lazy 意味著最終資料的一致性,但是當前是進行了偷懶的(指store store替代了storeload)
原始碼基本上和AtomicInteger類似,但是並不是底層存的布林型別,而是使用int型別,0代表false,1代表true
和AtomicInteger類似
AtomicIntegerArray
,AtomicLongArray
,AtomicReferenceArray
的原理類似,陣列型別更新的問題在於,我要更新下標為i
的元素,我怎麼知道i這個元素的地址。如果我們知道第一個元素相對於物件的偏移base,和每個元素的偏移s,那麼第i個元素就是base+i*s
private static final int base = unsafe.arrayBaseOffset(int[].class);
private static final int shift;
static {
//每個元素的大小
int scale = unsafe.arrayIndexScale(int[].class);
//必須是2的n次冪
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
//每個元素大小為4 那麼第n個大小偏移就是n*4 也就是n<<2
//shift 是31 - scale的前導0 方便後續進行位移操作獲取第n個元素相對於第一個的偏移量
shift = 31 - Integer.numberOfLeadingZeros(scale);
}
//前置檢查
private long checkedByteOffset(int i) {
if (i < 0 || i >= array.length)
throw new IndexOutOfBoundsException("index " + i);
return byteOffset(i);
}
//第n個元素的位置
//i*4+base==> i<<2 + base
private static long byteOffset(int i) {
return ((long) i << shift) + base;
}
public final int get(int i) {
return getRaw(checkedByteOffset(i));
}
private int getRaw(long offset) {
//呼叫getIntVolatile 保證了可見性
return unsafe.getIntVolatile(array, offset);
}
//同樣設定也是呼叫putIntVolatile
public final void set(int i, int newValue) {
unsafe.putIntVolatile(array, checkedByteOffset(i), newValue);
}
其他方法和AtomicInteger
中大差不大都是呼叫Unsafe中的相關方法
AtomicReference
還是老套路,不多贅述
AtomicReferenceFieldUpdater
是一個抽象類,使用的時候必須呼叫newUpdater(持有欄位類的class,欄位型別,欄位名稱)
來獲取它的實現AtomicReferenceFieldUpdaterImpl
(呼叫了AtomicReferenceFieldUpdaterImpl
的構造方法涉及一些類載入器知識)後續的更新也是呼叫unsafe的cas相關操作
AtomicMarkableReference
可以同時更新參照和參照的標記,上面我們提到CAS的一個缺點——ABA問題,比如說,當前商店存在一個活動,如果賬戶辦理衝一百送50,每個賬戶依次機會,A使用者充值後獲得150元立馬消費成0元接著充值100,如果用普通的原子類AtomicInteger
程式還會再次送50元給使用者A(ABA問題,程式不知道是否贈送過了),我們可以使用鎖充值後獲取鎖往集合裡面記錄當前使用者贈送了,也可以使用AtomicMarkableReference
通過更新mark
來記錄使用者贈送過了
AtomicMarkableReference
內部維護了一個Pair
,並且private volatile Pair<V> pair
持有一個pair
compareAndSet(舊參照,新參照,舊標記,新標記)
public boolean compareAndSet(V expectedReference,
V newReference,
boolean expectedMark,
boolean newMark) {
//當前AtomicMarkableReference 中的 Pair 記錄參照和 標記
Pair<V> current = pair;
return
//舊參照和Pair中參照相同,舊標記和Pair中的標記相同
expectedReference == current.reference &&
expectedMark == current.mark
&& //這裡是且
//新參照相同 且 新標記相同
((newReference == current.reference &&
newMark == current.mark)
|| //這裡是或
//CAS修改pair屬性
casPair(current, Pair.of(newReference, newMark)));
}
也就是說,首先要求舊值是和當前pair相同的,如果修改之前被其他執行緒修改了那麼短路返回false,如果參照從始至終都沒改變,那麼都不需要CAS操作,否則CAS pair屬性,下面是casPair
的原始碼——還是老套路
private boolean casPair(Pair<V> cmp, Pair<V> val) {
return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}
attemptMark(舊參照,新標記)
public boolean attemptMark(V expectedReference, boolean newMark) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
(newMark == current.mark ||
casPair(current, Pair.of(expectedReference, newMark)));
}
和compareAndSet
的區別在於,其只要求參照相同,如果mark相同那麼什麼都不做,反之CAS修改pair
AtomicStampedReference
也是用來解決ABA問題的,不同的是其標記不只是true和false,可以是1,2,3等等等版本號,我們把AtomicMarkableReference
中活動改下,每一個賬戶可以參與3次活動,那麼在充值的時候我們把版本號加1,最後版本號來到3 表示這個賬戶參與了3次,後續充值就不贈送了。
AtomicStampedReference
實現和AtomicMarkableReference
簡直一模一樣,區別在於AtomicStampedReference
中Pair類是參照和版本號
上面我們提到CAS的缺點說到存在熱點資料更新導致多數執行緒失敗自旋的問題,其中一個解決辦法是自旋次數,失敗就返回活動太火爆
這種勸退訊息,另外一種解決辦法是——熱點資料拆分開,最後再彙總。這個思路和ConcurrentHashMap
的分段鎖思路類似,既然我如同HashTable
導致效能低下(修改key A和B都受一把鎖的影響)那麼我把資料,不同的資料使用不同的鎖,就可以提高吞吐量了。在累加器中的體現就是,在最初無競爭時,只更新base的值,當有多執行緒競爭時通過分段的思想,讓不同的執行緒更新不同的段,最後把這些段相加就得到了完整儲存的值。
累加器的思路都類似,我們選擇LongAdder 和 LongAccumulator來看下
LongAdder 內部有base用於在沒有競爭的情況下,進行CAS更新,其中還有Cell陣列在衝突的時候根據執行緒唯一標識對Cell陣列長度進行取模,讓執行緒去更新Cell陣列中的內容。這樣最後的值就是 base+Cell陣列之和,LongAdder自然只能保證最終一致性,如果邊更新邊獲取總和不能保證總和正確。
這裡比較迷惑的就是Striped64
這個類,此類是一個內部類,用於實現 Adder 和 Accumulator,我們上面所說的base,Cell陣列其實就是在此類中的。
此類位於Striped64
中,就是我們上面說的Cell陣列進行熱點資料分離的Cell
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
//老套路 unsafe
private static final sun.misc.Unsafe UNSAFE;
//value 欄位的偏移量
private static final long valueOffset;
static {
//初始化 獲取unsafe 範例 以及獲取value 偏移量
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
Cell
這個類還是老套路,唯一不同的是它類上面具有一個註解 @sun.misc.Contended
此註解會進行快取填充,避免快取偽共用
(這部分內容在文末);
NCPU
記錄了系統 CPU 的核數,因為真正的並行數最多隻能是 CPU 核數,因此 cells
陣列一般要大於這個數。cells
陣列,大小是 2 的次方,這樣將執行緒對映到 cells
元素時方便計算。base
,基本數值,一般在無競爭能用上,同時在 cells
初始化時也會用到。cellsBusy
,自旋鎖,在建立或擴充 cells
時使用void add(long x)
public void increment() {
add(1L);
}
public void decrement() {
add(-1L);
}
//LongAdder中增大和減小都是直接呼叫的add(long x) 方法
public void add(long x) {
//as ——Cells陣列的參照
//b 獲取到的base值
//v 期望值
//m 標識Cells陣列的長度-1
//a 標識當前執行緒命中的Cell單元格
Cell[] as; long b, v; int m; Cell a;
//如果 cells陣列初始化了(Striped64是懶惰的初始化,沒有執行緒競爭的時候cells陣列不會被初始化)
// 或者 cas的修改base值 失敗了(說明多個執行緒都在嘗試cas修改,出現了競爭)
if ((as = cells) != null || !casBase(b = base, b + x)) {
//沒有發生衝突的標識
boolean uncontended = true;
//as == null || (m = as.length - 1) < 0 表示如果cell陣列為空
if (as == null || (m = as.length - 1) < 0 ||
//或者當前執行緒的cell單元沒有初始化
(a = as[getProbe() & m]) == null ||
//或者cas修改base失敗了
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
A.對於第一個if
從來沒有發生過競爭
並行量很低的時候Cell
陣列就是空,這個時候第一個if中的 (as = cells) != null
就是false 會繼續執行後續的!casBase(b = base, b + x)
這一步會cas的更新bese 值
之前發生過競爭
這個時候第一個if的 (as = cells) != null
就成立了 ,不會走第一個if中的cas操作,直接進入第二個if
B.對於第二個if
進入第二個if,當前執行緒需要把值更新到對應的cell中
as == null || (m = as.length - 1) < 0
這意味著cell陣列沒有初始化,也就是說這是第一次存在高並行競爭的情況,那麼呼叫longAccumulate
這個方法會幫我們初始化cell陣列的
(a = as[getProbe() & m]) == null
這意味著,cell陣列初始化了,但是當前執行緒標識取模陣列長度得到當前執行緒應該更新的cell為空
threadLocalRandomProbe
欄位,欄位沒有初始化的時候預設是0如果當前執行緒所屬的Cell為空,那麼也會呼叫longAccumulate
這裡我們要關注一點
getProbe 方法初始的時候都是0,0取模任何數都是0
那麼每一個執行緒最開始都會分配第一個Cell,
那麼第一個Cell為空意味著什麼暱,
這個問題需要我們看完longAccumulate 方法才能揭曉
其實probe=0在longAccumulate方法中意味著
當前執行緒沒有和其他執行緒發生衝突更新
在longAccumulate 會初始化probe 設定衝突更新表示為false
!(uncontended = a.cas(v = a.value, v + x))
這裡是呼叫Cell的cas方法,就是更新Cell物件中的value欄位,如果這裡cas失敗了,說明當前存在一個執行緒也在更新當前cell物件的value,兩個執行緒要更新一個cell,說明出現了衝突,也會呼叫longAccumulate
進行自旋更新cell單元格中的值。
Striped64
#longAccumulatelongAccumulate 方法非常長,我們拆看慢慢看
初始化threadLocalRandomProbe
//如果是0 表示沒有是沒有初始化的
//這裡會為當前執行緒生成一個probe
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
//設定為沒有競爭,
wasUncontended = true;
}
自旋保證當前執行緒能把值寫入
A.如果Cell陣列已經成功初始化,下面都是A的子情況
情況1:如果當前執行緒threadLocalRandomProbe取模後對應的cell為空,那麼我們需要在當前執行緒對應的位置new一個cell賦值上去
//as——cells陣列參照
//a 當前執行緒對於的cell
//n 當前陣列長度
//v 期望值
//h 當前執行緒的threadLocalRandomProbe
//x是當前執行緒要增加的值
Cell[] as; Cell a; int n; long v;
//如果cells初始化了
if ((as = cells) != null && (n = as.length) > 0) {
//如果當前執行緒threadLocalRandomProbe取模後對於的cell為空
//==========程式碼點1(後續解析中會使用到)==============
if ((a = as[(n - 1) & h]) == null) {
//cellsBusy是一個自旋鎖保證Cell陣列的執行緒安全
//0代表無執行緒調整Cell陣列大小or或建立單元格
//1 則反之
//==========程式碼點2(後續解析中會使用到)==============
if (cellsBusy == 0) {
//為當前執行緒建立一個cell,
//x直接賦值給其中的value 後續求和會加上這個x,從而實現增加
Cell r = new Cell(x);
//0代表無執行緒調整Cell陣列大小or或建立單元格
//casCellsBusy 是從0設定成1 表示當前執行緒嘗試獲取這把鎖
//==========程式碼點3(後續解析中會使用到)==============
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try {
Cell[] rs; int m, j;
//重新判斷cell陣列初始化了,且當前cell是空
//看下方解析為何需要重新
//==========程式碼點4(後續解析中會使用到)==============
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
//設定到cell陣列上
rs[j] = r;
created = true;
}
} finally {
//釋放鎖
cellsBusy = 0;
}
//如果這裡成功建立了cell,說明成功把值加上去了
//那麼退出自旋
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
//....省略部分程式碼,這部分也會在後續解析
//重新重新整理當前執行緒的Probe
//==========程式碼點5(後續解析中會使用到)==============
h = advanceProbe(h);
}
這裡比較有意思有
在判斷(a = as[(n - 1) & h]) == null
即當前執行緒對應的cell為空(程式碼點1
),首先在程式碼點2
是判斷了cellsBusy == 0
說明當前無執行緒在建立Cell單元格的,然後new了一個Cell,繼續在程式碼點3
還是會判斷cellsBusy == 0
,是由於我們在new一個cell的過程中可能存在消耗完時間片的情況,然後其他執行緒恰好可能已經獲得到了cellsBusy
這把鎖,這裡再次判斷cellsBusy
反之無腦獲取鎖執行casCellsBusy
,可以說doug lea真的是效能狂魔
在程式碼點4
,來到程式碼點4
其實已經在程式碼點1
處已經判斷了當前執行緒對應的Cell單元格為空啊,為什麼這裡還要判斷一次暱,因為可能在當前new 一個cell的這段時間有另外一個執行緒也設定了這個位置的Cell,或者改變了cell陣列,並且釋放了cellsBusy
鎖,為了保證此位置的Cell單元格的值不被當前執行緒無腦覆蓋,所有再次進行了判斷。
什麼時候會結束自旋,這段程式碼其實給出了一個答案——created為true
這裡的created
只會在當前執行緒成功設定其對應的單元格為new Cell(增加的值)
時為true,也就代表著當前執行緒已經成功進行了一個增加操作
什麼時候會繼續自旋
程式碼點2
處的if (cellsBusy == 0)
不成立
這意味著,當前執行緒對應的Cell為空,但是存在其他執行緒正在調整Cell陣列大小or或建立單元格,為了保證Cell陣列中的值不被覆蓋,這個時候會執行到程式碼點5
呼叫advanceProbe
重新為當前執行緒生成一個probe
//使用位元運算,把當前執行緒的probe隨機打散,為啥這裡這樣進行位元運算
//我只能說,可能時doug lea研究後的,或者他喜歡這個幾個數位
//但是這幾個陣列都是質數,大概率後面是存在理論支撐的,
static final int advanceProbe(int probe) {
probe ^= probe << 13; // xorshift
probe ^= probe >>> 17;
probe ^= probe << 5;
UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
return probe;
}
程式碼點3
處的if(cellsBusy == 0 && casCellsBusy())
不成立
和上面的1差不多
if (created)
不成立
這意味,程式碼點4
處的判斷不成立,說明存線上程A已經完成了cell陣列的擴容,導致當前執行緒對應的Cell改變了(陣列長度擴大2,probe%長度=n,可能是原來的位置n,也可以是n+當前長度/2 )也可能是執行緒A給當前執行緒對應Cell單元格賦值了(執行緒A的probe對陣列長度取模後和當前執行緒相同,但是執行緒A搶先一步設定了單元格)但是這時候不會呼叫到advanceProbe
因為可以沿用之前的probe找到對應的位置進行設定值,這個坑位還是可以設定值的只是有人搶先一步了,不能直接new Cell(x)
,需要讓這個Cell值增加x
,但是1和2呼叫advanceProbe
的原因是,為了提升效能,讓他隨便找個其他坑位做增加的操作。
再次給看跪了,doug lea真效能狂魔
情況2:如果在LongAdder#add
方法中對應Cell進行CAS失敗,那麼rehash後繼續自旋
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
//省略了情況1的程式碼
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true;
//....省略部分程式碼,這部分也會在後續解析
//重新重新整理當前執行緒的Probe
h = advanceProbe(h);
wasUncontended
這個變數位false只可能是呼叫longAccumulate
這個方法入參就為false,讓我們回到LongAdder#add
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
//初始為true
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
//注意這裡
!(uncontended = a.cas(v = a.value, v + x)))
//要讓uncontended為false
//那麼說明上面的a.cas(v = a.value, v + x)失敗了
longAccumulate(x, null, uncontended)
}
}
這裡我們可以看到,必須是當前執行緒對其cell進行cas操作失敗才可能為false,這裡的false意味著,當前並行很高,有幾個老六執行緒在對這個一個cell進行cas,那麼這個時候會執行到else if (!wasUncontended) wasUncontended = true
然後執行advanceProbe
,這意味著,只能說當前執行緒命不好
執行重新rehash下probe
換一個Cell單元格進行操作,可以理解為Java就業太捲了,換Go語言了。這樣做的好處是提高了其他cell單元格的利用率,效能up,這裡把wasUncontended
隨後設定為true,可以理解為,當前執行緒都要rehash了,後續發生還不行那就是「崗位不夠了」得擴容Cell陣列了,後續也就用不著wasUncontended
情況3:成功把值增加到對應的Cell
if ((as = cells) != null && (n = as.length) > 0) {
//省略講過的程式碼。。。
// 成功把值增加到對應的cell
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
//自旋結束
break;
這裡出現了自旋退出的另外一個情況,那就是當前執行緒成功把增加的值設定到其對應的cell單元格,這時候結束自旋,很合理。
這裡出現了一個fn
指的是呼叫當前方法傳入的LongBinaryOperator
是一個函數式介面。LongAdder的add方法預設傳入的是空,會執行v + x
也就是增加cell單元格的值,這個LongBinaryOperator
在LongAccumulator
使用到,後續我們看下
@FunctionalInterface
public interface LongBinaryOperator {
long applyAsLong(long left, long right);
}
情況四:對Cell陣列進行擴容
如果並行實在是太大了,Cell陣列單元格的數量已經容納不下這麼多執行緒一起執行了,那麼為了避免想AtomicLong
一樣無腦自旋,浪費CPU,這時候會選擇對Cell陣列進行擴容。
if ((as = cells) != null && (n = as.length) > 0) {
//省略講過的程式碼。。。
//==========程式碼點1(後續解析中會使用到)==============
//collide 表示擴容的意向,為true並不代表一定會擴容
//如果cell陣列的長度大於了jvm可以使用的核心數 或者cells陣列參照改變了
else if (n >= NCPU || cells != as)
collide = false;
//==========程式碼點2(後續解析中會使用到)==============
else if (!collide)
collide = true;
//拿到cellsBusy這把鎖
else if (cellsBusy == 0 && casCellsBusy()) {
try {
//判斷下cells參照沒有改變
//==========程式碼點3(後續解析中會使用到)==============
if (cells == as) {
//擴容 擴大1倍
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
//改變cells應用指向
cells = rs;
}
} finally {
//釋放鎖
cellsBusy = 0;
}
//擴容意向為false
collide = false;
continue;
}
//rehash probe
h = advanceProbe(h);
}
這裡有意思的點有
(n >= NCPU || cells != as)
如果cell陣列長度已經大於等於jvm可以使用的cpu核心數了,或者cells參照指向改變了,那麼擴容意向設定為false,然後執行advanceProbe
對當前執行緒的probe進行rehash
java執行緒模型的學習筆記中,我們指出,java執行緒和作業系統是一對一模型,我理解這裡一個cpu核心執行在一個時刻執行一個執行緒,所以cells陣列太大也沒什麼用。那麼為什麼cells != as
成立也是進行rehash probe然後繼續自旋暱,這裡可以理解為當前執行緒嘗試對它對應的cell單元格進行cas操作,但是失敗了,這個時候發現cells != as
說明有其他執行緒對當前cell陣列進行了擴容,從而改變了cells陣列的參照指向(as是就的cells陣列)為了防止多次擴容,這個時候就設定以下擴容意向為false 然後讓當前執行緒「從卷java,轉變為卷Go」換一個並行不那麼高的Cell陣列單元格進行cas操作。
程式碼點2
進入到這裡的情況有 (a = as[(n - 1) & h]) == null
但是被其他執行緒初始化了對應位置的Cell,cas設定對應cell失敗,cell陣列已經達到jvm可用cpu,當前執行緒執行的途中沒有其他執行緒完成擴容。但是當前還是無法在自己對應的cell上成功進行cas,說明和其他執行緒發生了衝突,這個時候讓當前執行緒rehash以下probe然後再次自旋一次,如果還是無法在自己對應的cell進行cas操作,且沒有發生擴容的話會來到下面的3
拿到鎖,對cell陣列進行擴容,進入這裡,說明沒有其他執行緒進行擴容,當前執行緒對應的cell不為null,但是對cell進行CAS操作還是失敗。這時候為了提高效能只能犧牲一點空間了,進行擴容。有意思的點在於程式碼點3
,為什麼在這裡還是需要進行一次判斷暱,因為cellsBusy == 0 && casCellsBusy()
這兩個操作不是原子的,可能cellsBusy == 0
執行完失去了時間片,這時候有一個老六進行了擴容,改變了cells陣列參照指向,並且釋放了鎖,這時候如果不做這個判斷,可能導致cell陣列元素的丟失。後續就是對cells進行擴容,然後釋放鎖,設定擴容意向為false,然後continue
,注意這個continue
,這會導致當前執行緒不會執行advanceProbe
,為什麼暱,哥們都擴容了,你現在讓我換個格子,那我為啥要擴容,屬於是「Java太卷,但是我命由我不由天,為自己創造崗位」,後續這個執行緒進行自旋的時候隨機到的Cell陣列可能還是原來的,可能是原來位置加上當前cell陣列長度的一半,但是還是可以把一些「競爭者」分散開了
B.當前Cell陣列沒有初始化,當前執行緒進行初始化
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
//是否完成了初始化
boolean init = false;
try {
//==========程式碼點1(後續解析中會使用到)==============
//確認沒有其他老六搶先初始化
if (cells == as) {
// 初始化
Cell[] rs = new Cell[2];
//選擇一個格子 設定為x,probe奇數那麼選擇rs[1] 反之rs[0]
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
//釋放鎖
cellsBusy = 0;
}
//如果成功初始化 那麼結束
if (init)
break;
}
這裡有意思的點,在程式碼點1
還是會進行cells == as
的判斷,這是由於cellsBusy == 0 && cells == as && casCellsBusy()
並不是一個原子操作,可能存在其他執行緒,搶先初始化cell陣列,所以需要再次判斷以下。這裡我們可以看到初始化的cell陣列大小為2,後續都是擴大一倍
C.Cell沒有初始化,但是當前執行緒嘗試初始化失敗,嘗試操作base值
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
來到這裡,說明A和B都是不成立的,也就意味著,當前執行緒進來的時候發現cell沒有初始化,然後來到B,但是cellsBusy == 0 && cells == as && casCellsBusy()
發現不成立,不成立的情況有
cellsBusy == 0
不成立,說明之前有執行緒已經拿到鎖了,正在初始化cells == as
不成立,有一個執行緒已經完成了初始化,導致cell參照指向改變casCellsBusy()
不成立,競爭鎖的過程中失敗了這個時候會讓當前執行緒嘗試更新下base值,說不定很多執行緒都在嘗試更新cell元素,這個時候更新下base 可能也許會成功。
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
沒什麼好說的,強於doug lea也只能保證最終一致性,顯然如果存在其他執行緒並行add的時候,這個方法只能拿到快照
資料
public void reset() {
Cell[] as = cells; Cell a;
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
a.value = 0L;
}
}
}
沒什麼好說的,執行緒不安全,如果存在其他執行緒add,這時候呼叫reset,可能導致並沒有reset成功,或者說如果其他執行緒擴容到一般,呼叫reset,那麼reset也會不成功。還有一點是reset並不會改變cell陣列大小
public long sumThenReset() {
Cell[] as = cells; Cell a;
long sum = base;
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null) {
sum += a.value;
a.value = 0L;
}
}
}
return sum;
}
求和並設定為0並不會改變cell陣列大小。
大致和LongAdder類似,LongAccumulator
需要指定如何如果操作 ——LongBinaryOperator(舊值沒有衝突時時base,衝突時是cell,accumulate傳入的值)
public LongAccumulator(LongBinaryOperator accumulatorFunction,
long identity) {
this.function = accumulatorFunction;
base = this.identity = identity;
}
public void accumulate(long x) {
Cell[] as; long b, v, r; int m; Cell a;
//cell陣列沒有初始化
if ((as = cells) != null ||
//或者 需要更新,cas失敗
(r = function.applyAsLong(b = base, x)) != b && !casBase(b, r)) {
boolean uncontended = true;
//cell陣列沒有初始化
if (as == null || (m = as.length - 1) < 0 ||
//或者當前執行緒對應的 cell為null
(a = as[getProbe() & m]) == null ||
!(uncontended =
//需要更新
(r = function.applyAsLong(v = a.value, x)) == v ||
//或cas失敗
a.cas(v, r)))
//這裡傳入了 function 在 longAccumulate中就不是簡單的自增了
longAccumulate(x, function, uncontended);
}
}
public long get() {
Cell[] as = cells; Cell a;
long result = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
result = function.applyAsLong(result, a.value);
}
}
return result;
}
這一這裡返回的值,不是進行累加而是function.applyAsLong(result, a.value)
取決於你定義的操作——LongBinaryOperator
和LongAdder 與LongAccumulator 類似,但是是通過把Double轉換成Long呼叫doubleAccumulate
來完成的
我們在解析LongAdder
原始碼的時候看到 Striped64
中的Cell
類上面存在一個@sun.misc.Contended
的註解,我們說這是為了反正快取偽共用,下面我們聊下啥是快取偽共用
@sun.misc.Contended static final class Cell {
volatile long value;
//省略部分程式碼
}
快取是由快取行組成的,通常一個快取行是 64 位元組,在程式執行的過程中,快取每次更新都從主記憶體中載入連續的 64 個位元組。因此,如果存取一個 long 型別的陣列時,當陣列中的一個值被載入到快取中時,另外 7 個連續的元素也會被載入到快取中,地址上不連續的就不會載入到同一個快取行了。這種免費載入也有一個壞處。設想如果我們有個 long 型別的變數 a,它不是陣列的一部分,而是一個單獨的變數,並且還有另外一個 long 型別的變數 b 緊挨著它,那麼當載入 a 的時候將免費載入 b。如果一個 CPU 核心的執行緒在對 a 進行修改,另一個 CPU 核心的執行緒卻在對 b 進行讀取,當前者修改 a 時,會把 a 和 b 同時載入到前者核心的快取行中,更新完 a 後其它所有包含 a 的快取行都將失效,因為其它快取中的 a 不是最新值了,而當後者讀取 b 時,發現這個快取行已經失效了,需要從主記憶體中重新載入
。這就很坑爹了,我只是想更新a,但是卻讓有效的b無效了。
只要我填一些無用的位元組,在a和b之間,讓a和b不在一個快取行中就解決了這個問題,但是現在虛擬機器器很聰明,會對我們手動填充的無用位元組進行忽視
@sun.misc.Contended
這也是一種填充無用位元組的做法,但是是jvm幫我填充。
如下Long1這個類標註了@sun.misc.Contended
我們在啟動的jvm的時候加上 -XX:-RestrictContended
對比不加 @sun.misc.Contended
註解的時候,其實有很大的差別(幾個數量級的差距)
public static void main(String[] args) {
test2();
}
private static void test2() {
Long1 long1 = new Long1();
CountDownLatch latch = new CountDownLatch(2);
long start = System.currentTimeMillis();
new Thread(() -> {
for (int i = 0; i < 1000000000; i++) {
long1.l1++;
}
latch.countDown();
}).start();
new Thread(() -> {
for (int i = 0; i < 1000000000; i++) {
long1.l2++;
}
latch.countDown();
}).start();
try {
latch.await();
long end = System.currentTimeMillis();
System.out.println(end - start);
} catch (InterruptedException e) {
}
}
@Contended
static class Long1 {
private volatile long l1;
private volatile long l2;
}
Cell
要加@sun.misc.Contended
如果使用@sun.misc.Contended
那麼ACell 和BCell不在一個緩衝行,就不會發生這樣的情況了,從主記憶體載入資料到快取還是需要消耗一定時間的。