原始碼基於jdk1.8
這一片主要講述ConcurrentHashMap如何實現低粒度鎖的插入,如何實現統計元素個數,如何實現並行擴容遷移
支援高並行讀寫的雜湊表,ConcurrentHashMap
中每一個方法都是執行緒安全的,並且讀資料通常不需要加鎖,並行的效能遠優於Hashtable
。
//每一個方法都是執行緒安全,但是複合操作並不一定安全,如下的contains然後put
ConcurrentHashMap<Object, Object> concurrentHashMap = new ConcurrentHashMap<>();
if (!concurrentHashMap.contains("1")){
concurrentHashMap.put("1","1");
}
雖然ConcurrentHashMap 具備很高的並行讀寫,但是對於並行的進行putAll 和get,只能反映瞬時態
。
下面這段程式碼如果執行緒A先拿到Hashtable的全域性鎖,那麼執行緒B一定可以get出值,但是如果把Hashtable換做ConcurrentHashMap即使在時間上putAll先於get,也無法保證一定能get到key1對應值
Hashtable<Object, Object> hashtable = new Hashtable<>();
//執行緒A 執行
hashtable.putAll(hashMap);
//執行緒B 執行
Object v1 = hashtable.get("key1");
ConcurrentHastMap的迭代器不會丟擲ConcurrentModificationException
但是可以遍歷到元素只有建立迭代器時的那部分。
ConcurrentHashMap支援自動並行擴容,注意這個擴容是並行的,意味著擴容的期間讀寫並不會被阻塞
ConcurrentHashMap的結構基本上和HashMap類似,都是陣列+連結串列+紅黑的方式:
陣列
是雜湊表查詢效率可以達到o(1)的關鍵
根據key的hash對陣列大小取模得到下標index,然後根據下標快速定位到桶,利用了陣列定址快的優點
連結串列
作用是解決hash衝突,發生hash衝突的節點就放在對應下標的連結串列 or 紅黑樹中
hash衝突:
hash然後對陣列大小取模的操作可以看作一個函數
這個函數的輸入時無窮的,但是輸出是介於0~陣列大小的,那麼必然存在多個key最後落在同一個下標的情況
紅黑樹
解決連結串列遍歷時間複雜度o(n)的問題,使用紅黑樹利用二分搜尋的性質進行優化。
紅黑樹本質是平衡二元搜尋樹,平衡表示左子樹右子樹規模儘量接近,左子樹小於父節點,父節點大於右子樹的特徵意味著可以使用log n的時間複雜度進行搜尋,這個大小的比較優先使用Comparable型別key的compareTo方法,但是如果key不可比較那麼將使用其hash值大小做比較。
在上圖中鎖的標誌可以知道,ConcurrentHashMap鎖的粒度,其鎖只鎖一個陣列下標的元素,相比於Hashtable直接鎖住整個雜湊表,其鎖的粒度更小,所以說ConcurrentHashMap具備更高的並行度。
這裡只分析比較重要的常數,類似於為了相容之前版本的常數不做列舉
名稱 | 值 | 含義 |
---|---|---|
MAXIMUM_CAPACITY | 1 << 30 | 陣列最大大小 |
DEFAULT_CAPACITY | 16 | 預設容量大小 |
LOAD_FACTOR | 0.75 | 負載因子,1.8的負載因子為0.75,假如容量為8,那麼當元素個數達到6的時候會發生擴容,擴容到原大小兩倍 |
TREEIFY_THRESHOLD | 8 | 樹化閾值,指定桶位 連結串列長度達到8的話,有可能發生樹化操作 |
UNTREEIFY_THRESHOLD | 6 | 紅黑樹轉化為連結串列的閾值 |
MIN_TREEIFY_CAPACITY | 64 | 接合TREEIFY_THRESHOLD控制桶位是否樹化, 只有當table陣列長度達到64且 某個桶位 中的連結串列長度達到8,才會真正樹化 |
MIN_TRANSFER_STRIDE | 16 | 執行緒遷移資料最小步長,控制執行緒遷移任務最小區間一個值 |
NCPU | ? | 當前系統的cpu核數 |
RESIZE_STAMP_BITS | 16 | sizeCtl中用於生成擴容戳的位數(擴容時詳細講解) |
屬性 | 作用 |
---|---|
transient volatile Node<K,V>[] table | 雜湊表,長度一定是2次方數 |
private transient volatile Node<K,V>[] nextTable | 擴容過程中,會將擴容中的新table 賦值給nextTable 保持參照,擴容結束之後,這裡會被設定為Null |
private transient volatile long baseCount | 實現ConcurrentHashMap計數的變數 |
private transient volatile int sizeCtl | ConcurrentHashMap中最複雜的變數,不同語境下具備不同作用,後面使用時介紹 |
private transient volatile int transferIndex | 擴容過程中,記錄當前進度。所有執行緒都需要從transferIndex中分配區間任務,去執行自己的任務。 |
private transient volatile CounterCell[] counterCells | ConcurrentHashMap實現計數的變數 |
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
//如果初始容量大於 最大容量(1 << 30)的一半,那麼直接使用最大容量,
//如果使用 tableSizeFor((初始容量 + 初始容量/2)+1) = 大於(1.5初始容量+1)的2的倍數
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
/**
* sizeCtl > 0
* 當目前table未初始化時,sizeCtl表示初始化容量
*/
this.sizeCtl = cap;
}
這裡有趣的是,構造方法並不導致陣列的初始化,這點類似HashMap,只有往ConcurrentHashMap中塞元素的才會初始化陣列
使用位元運算 返回大於等於c的最小的2的次方數
假如c為1297 n就是1296
1296的二進位制 0000 0101 0001 0000
右移1位 0000 0010 1000 1000
或操作 0000 0111 1001 1000
右移2位 0000 0001 1110 0110
或操作 0000 0111 1111 1111
右移4位元 0000 0000 0111 1111
或操作 0000 0111 1111 1111
。。。右移 16位元就讓32位元樹最高位及以後的都為1
最後進行+1 求出不小於它的最接近的2的整數冪m
n<0 說明當前數太大了 32位元全是1 已經超過了int 最大,陣列不可以開闢這麼大
put方法是看懂ConcurrenHashMap的重點,此方法會被多個執行緒並行呼叫,那麼douglea 是如何並行問題的暱?
執行緒安全且低粒度鎖的插入 -> 熱點資料分離的統計ConcurrentHashMap元素個數 -> 並行擴容遷移
put方法直接呼叫了putVal,其中第三個參數列示只有在存在相同key的時候才插入,這裡為false說明覆蓋(之前存在key=1,value=2,這時候插入key=1,value=3.那麼會覆蓋為3)。下面我們大致看下putVal方法的流程
下面我們分析為什麼需要使用高16位元進行互斥或操作,為什麼這種操作可以讓key雜湊到陣列中更均勻(這裡和HASH_BITS進行且操作的目的是讓hash值恆定為一個正數,因為再ConcurrentHashMap中負數hash值具備特殊意義,HashMap中則沒有這個特殊處理)
key是雜湊到指定陣列下標的
這裡很巧妙的利用了陣列長度是2的倍數的特點,2的倍數-1意味著低位連續x個1,對其進行且操作就是隻保留原hash值的低x位等於取模
為什麼需要使用高16位元進行互斥或操作,為什麼這種操作可以讓key雜湊到陣列中更均勻
假設插入key的hash滿足一個規律 hash = 6000整數倍+3,且陣列的長度為8
hash | 對應二進位制 | 且上(陣列長度-1 = 7 =0111) |
---|---|---|
6003 | 0001011101110 011 | 011 = 3 |
60003 | 1110101001100 011 | 011 = 3 |
600003 | 10010010011111000 011 | 011 = 3 |
6000003 | 010110111000110110000 011 | 011 = 3 |
可以看到最終這些key都將落在陣列下標3中,這是為什麼暱?
因為上面的i * 60000對8取餘肯定是0 剩下的結果只能是3對8取餘 導致結果一直是3
因為hash 高位的值沒有參與計算
如果使用高16位元進行互斥或,得到數位的低位16位元包含了原高16位元的特徵,從而讓高位也參與計算,讓雜湊更均勻
hash | 對應二進位制 | hash ^ (hash>>>16) | 且上(陣列長度-1 = 7 =0111) |
---|---|---|---|
6003 | 0001011101110 011 | 0001011101110 011 | 011 = 3 |
60003 | 1110101001100 011 | 1110101001100 011 | 011 = 3 |
600003 | 10010010011111000 011 | 011000000000000000000010 | 010 = 2 |
6000003 | 010110111000110110000 011 | 010110111000110111011000 | 000 = 0 |
可以看到 600003,6000003
被雜湊到2和0,避免了這一類資料都雜湊到下標3導致hash衝突劇烈的問題。
上述高16位元互斥或的做法有什麼缺點和優點
缺點
根據上面表格的例子可以看到 6003,60003
由於這些數位二進位制高16位元全為0,所以最終還是都雜湊到3,並沒有被很好的離散開。
可以學習redis 使用Crc16對key進行雜湊,可以讓雜湊更均勻
優點
時間複雜度低,如果使用md5,或者Crc16等演演算法進行雜湊,其時間成本將更大。
這種情況發生在ConcurrentHashMap第一次塞元素的情況,會呼叫initTable對陣列進行初始化,和HashMap不同點在於,這個initTable存在多個執行緒並行呼叫的可能,下面我們看看doug lea如何保證這一步的執行緒安全,和執行緒可見性(執行緒A初始化了陣列,執行緒B必須馬上可見)
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
//自選,只要陣列沒有初始化
while ((tab = table) == null || tab.length == 0) {
//sizeCtl<0 說明其他執行緒正在初始化陣列
if ((sc = sizeCtl) < 0)
//當前執行緒只需要放棄cpu 靜靜等待
Thread.yield(); // lost initialization race; just spin
//cas設定sizeCtl 為-1 cas成功表示當前執行緒成功上鎖,由當前執行緒進行初始化table
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
//再次檢查,避免其他執行緒初始化了,當前執行緒還進行初始化導致前面執行緒put的元素丟失
//因為下面finally 會釋放鎖,釋放的一瞬間可能其他執行緒成功cas為-1,從而讓前面初始化執行緒put的內容丟失
if ((tab = table) == null || tab.length == 0) {
//sc 大於0 這是使用者在構造方法中設定的大小,如果使用者沒有設定那麼使用預設大小
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//new 一個陣列 並且賦值給table
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//n為當前陣列的大小
//n - (1/4*n) = 3/4 n 也就是負載因子0.75 * n
sc = n - (n >>> 2);
}
} finally {
//釋放鎖
sizeCtl = sc;
}
//結束自旋
break;
}
}
//返回初始化後的陣列
return tab;
}
總體上是利用sizeCtl做了一把鎖,如果沒有搶佔到鎖,那麼讓出cpu,這裡沒有使用synchronized
的原因應該是:初始化一個陣列是一個很快的過程,如果把其他執行緒掛起,初始化完後還需要進行喚醒,掛起和喚醒需要從使用者態切換到核心態,為了一個短暫的初始化而進行掛起和喚醒是得不償失的。
並且table屬性是volatile修飾的屬性,保證了可見性
程式碼中還有一點非常有趣
可以看到doug lea使用cas的方式保證了多個執行緒插入元素到先前不具備元素的桶中,只有一個執行緒會成功,成功的執行緒將結束自旋,失敗的執行緒將繼續自旋
這裡還有一個細節,那就是如何保證獲取陣列元素的執行緒可見性,table陣列雖然被volatile修飾,但是table中元素怎麼保證可見性:
可以看到,獲取table元素的方式是 Unsafe.getObjectVolatile(陣列元素的地址)
的方式,這種方式會 在讀指令前插入讀屏障,可以讓快取記憶體中的資料失效,重新從主記憶體載入資料
這裡先暫時忽略helpTransfer 幫助擴容遷移的內容,後面講到擴容我們再看看doug lea如何實現幫助擴容遷移的
final V putVal(K key, V value, boolean onlyIfAbsent) {
//不可存null key 和null value
if (key == null || value == null) throw new NullPointerException();
// 對key的hash進行擾動,是高16位元參與計算,讓雜湊更均勻
int hash = spread(key.hashCode());
//如果插入到連結串列中的那麼記錄連結串列中的下標,
//如果插入到紅黑樹那麼恆定為2,
//如果沒有發生hash衝突(沒有插入到連結串列也沒插入到樹)那麼為0
int binCount = 0;
//自旋
for (Node<K,V>[] tab = table;;) {
//忽略無關程式碼。。。
else {
//進入這個位置的前提
//1.陣列初始化了
//2.當前key hash對陣列取模得到下標後,根據下標到陣列中拿到對應的節點不是空(不是第一個插入在這個位置的元素)
//3 陣列在當前插入位置沒有進行擴容遷移
//舊值
V oldVal = null;
//加鎖 鎖的是頭節點(可能是連結串列,也可能是樹節點,等其他節點)
synchronized (f) {
// 重新獲取以下,避免其他執行緒修改了頭節點,從而鎖錯,如果其他執行緒修改了頭,那麼什麼也不做,繼續自選
if (tabAt(tab, i) == f) {
//當前key hash對陣列取模得到下標後,根據下標到陣列中拿到對應的節點的hash值 大於0,說明是連結串列節點,因為樹節點的hash恆定-2
if (fh >= 0) {
//記錄在連結串列中的位置
binCount = 1;
//遍歷連結串列中每一個節點
for (Node<K,V> e = f;; ++binCount) {
// ek記錄當前遍歷節點的key
K ek;
//當前遍歷節點kash = 企圖插入key的hash
//且key是同一個物件 或者key equals為true 說明找到插入的位置
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
//記錄舊值
oldVal = e.val;
//如果不是不存在才插入(put方法onlyIfAbsent=false,會進行值替換)
if (!onlyIfAbsent)
//替換值
e.val = value;
//插入成功那麼結束自選
break;
}
//遍歷節點的前一個節點
Node<K,V> pred = e;
//e是當前遍歷的節點,如果當前節點的下一個節點為null
//此處還會讓e = e.next 相當於讓遍歷節點向後移動
if ((e = e.next) == null) {
//到這兒說明 當前key和連結串列中每一個節點的key都不同(不能==,也不能equals)
//那麼將當前插入的值包裝成node 掛到連結串列末尾
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//當前key hash對陣列取模得到下標後,根據下標到陣列中拿到對應的節點 是一個樹節點
else if (f instanceof TreeBin) {
Node<K,V> p;
//恆定binCount為2
binCount = 2;
//紅黑樹的插入 p為插入過程中key ==或者equals的節點,
//說明存在重複的節點 p是被覆蓋的節點
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
//記錄舊值
oldVal = p.val;
//如果不是不存在才插入(put方法onlyIfAbsent=false,會進行值替換)
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
//binCount 如果插入下標對應的第一個節點是空(當前是第一個插入到這個位置的)才為0
//插入連結串列中 binCount記錄插入到連結串列中的下標
//如果插入到紅黑中,binCount恆定為2
if (binCount != 0) {
//大於樹化的閾值,那麼進行樹化(內部會要求陣列長度大於64)
if (binCount >= TREEIFY_THRESHOLD)
//樹化
treeifyBin(tab, i);
//如果之前相同key的元素存在,返回舊的key
if (oldVal != null)
return oldVal;
break;
}
}
}
//計數,會記錄ConcurrentHashMap中的元素個數
//如果超過擴容閾值 還會進行擴容遷移
addCount(1L, binCount);
return null;
}
這部分程式碼沒有很複雜,使用synchronized
鎖住頭節點保證執行緒安全,但是即使加了鎖,doug lea居然還會使用if (tabAt(tab, i) == f)
再次進行校驗,這是為什麼暱?
統計數量和並行擴容都交給putVal最後的addCount方法,如果是覆蓋了原有key value或者沒有插入到ConcurrentHashMap那麼將不進行addCount(1L, binCount)
,因為沒有新增元素。
統計元素個數的難點
如果addCount被並行呼叫且只使用一個屬性記錄元素個數,我們可以使用下列方案統計數量
使用獨佔鎖
這種方案的缺點,多個執行緒並行插入最終都將受限於統計數量,這一步只能序列,這和ConcurrentHashMap主打一個並行是相悖的
使用AtomicLong
這種方案的缺點在於,假如當前有100個執行緒,執行緒1cas修改數目成功,將導致執行緒2~100失敗繼續自旋,執行緒2成功將導致其餘執行緒失敗自旋,導致執行緒進行多次無意義的自旋,把珍貴的cpu資源浪費在自旋上。這是熱點資料更新
導致的問題
熱點資料分離的方案
使用LongAdder
ConcurrentHashMap沒有直接使用LongAdder,幾乎是一樣的程式碼再寫了一次,可能doug lea覺得ConcurrentHashMap只需要加1這種操作,不需要所有LongAdder所有功能從而進行了獨立實現
JUC原始碼學習筆記4——原子類,CAS,Volatile記憶體屏障,快取偽共用與UnSafe相關方法 - Cuzzz - 部落格園 (cnblogs.com)這篇部落格詳細講解了LongAdder的原理——熱點資料分離
其實和分表思想類似,將熱點資料分散到多出,從而減少熱點資料更新的問題。
這種方案雖好但是有另外一個問題——統計熱點資料之和不具備瞬時一致性,想象我們需要遍歷完所有熱點資料計算其和,可能遍歷完A,再遍歷B的時候,A被修改了。
魚和熊掌不可兼得
並行擴容遷移的難點
如何讓多個執行緒協調合作,如何讓遷移的過程不阻塞讀,如何保證多執行緒遷移的執行緒安全問題
addCount方法很長,我們分兩部分講解
增加CocurrentHashMap元素數量,並統計總數
這部分需要LongAdder的基礎,推薦學習《JUC原始碼學習筆記4——原子類,CAS,Volatile記憶體屏障,快取偽共用與UnSafe相關方法 - Cuzzz - 部落格園 (cnblogs.com)》
//as 表示 LongAdder.cells
//b 表示LongAdder.base
//s 表示當前map.table中元素的數量
CounterCell[] as; long b, s;
//條件一:(as = counterCells) != null
//true->表示cells已經初始化了,當前執行緒應該去使用hash定址找到合適的cell 去累加資料
// false->表示當前執行緒應該將資料累加到 base
//條件二:!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)
//false->表示寫base成功,資料累加到base中了,當前競爭不激烈,不需要建立cells
//true->表示寫base失敗,與其他執行緒在base上發生了競爭,當前執行緒應該去嘗試建立cells。
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
//有幾種情況進入到if塊中?
//1.true->表示cells已經初始化了,當前執行緒應該去使用hash定址找到合適的cell 去累加資料
//2.true->表示寫base失敗,與其他執行緒在base上發生了競爭,當前執行緒應該去嘗試建立cells。
//a 表示當前執行緒hash定址命中的cell
CounterCell a;
//v 表示當前執行緒寫cell時的期望值
long v;
//m 表示當前cells陣列的長度
int m;
//true -> 未競爭 false->發生競爭
boolean uncontended = true;
//條件一:as == null || (m = as.length - 1) < 0
//true-> 表示當前執行緒是通過 寫base競爭失敗 然後進入的if塊,就需要呼叫fullAddCount方法去擴容 或者 重試.. LongAdder.longAccumulate
//條件二:a = as[ThreadLocalRandom.getProbe() & m]) == null 前置條件:cells已經初始化了
//true->表示當前執行緒命中的cell表格是個空,需要當前執行緒進入fullAddCount方法去初始化 cell,放入當前位置.
//條件三:!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)
// false->取反得到false,表示當前執行緒使用cas方式更新當前命中的cell成功
// true->取反得到true,表示當前執行緒使用cas方式更新當前命中的cell失敗,需要進入fullAddCount進行重試 或者 擴容 cells。
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
) {
//這裡面涉及到Cell陣列的擴容,自旋計數
//和LongAdder程式碼一模一樣
fullAddCount(x, uncontended);
return;
}
//只有put插入的時候,key對應table位置的元素為空,且cas成功 這時候check為0
//或者刪除的時候 check為-1
//也就是說是這個桶中第一次插入,沒有發生hash衝突,doug lea 認為這種情況說明桶中元素並不多
//或者說是刪除操作,刪除自然不需要進行擴容
//所以直接返回 不會進行擴容
if (check <= 1)
return;
//記錄當前ConcurrentHashMap中有多少個元素 這個過程會遍歷Cell陣列
//因此不具備瞬時一致性
s = sumCount();
}
....下面為並行擴容遷移
擴容過程中需要記錄當前擴容批次,比如從陣列長度16擴容到陣列長度32會為擴容生成一個唯一標識
並且還會記錄有多少個執行緒在進行擴容,這兩部分資訊用sizeCtl一併記錄
高16位元表示:擴容的標識戳(方法resizeStamp(當前資料大小)生成)
低16位元表示:(1 + 參與擴容的執行緒數)
可以看到resizeStamp生成批次或上(擴容執行緒數+1)一定是負數==>sizeCtl如果是負數表示當前正在進行擴容
//s 是上面統計Cell陣列得到的ConcurrentHashMap元素個數
//刪除情況呼叫addCount的時候check是負數,這裡標識是compute 或者put方法新增了元素
if (check >= 0) {
//tab 表示map.table
//nt 表示map.nextTable
//n 表示map.table陣列的長度
//sc 表示sizeCtl的臨時值
Node<K,V>[] tab, nt; int n, sc;
//自旋
//條件一:s >= (long)(sc = sizeCtl)
// true-> 1.當前sizeCtl為一個負數 表示正在擴容中..
// 2.當前sizeCtl是一個正數,表示擴容閾值
// false-> 表示當前table尚未達到擴容條件
//條件二:(tab = table) != null
// 恆成立 true
//條件三:(n = tab.length) < MAXIMUM_CAPACITY
// true->當前table長度小於最大值限制,則可以進行擴容。
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
//擴容批次唯一標識戳,使用位元運算生成,擴容唯一標識
//16 -> 32 擴容 標識為:1000 0000 0001 1011
int rs = resizeStamp(n);
//條件成立:表示當前table正在擴容
// 當前執行緒理論上應該協助table完成擴容
if (sc < 0) {
//條件一:(sc >>> RESIZE_STAMP_SHIFT) != rs
// true->說明當前執行緒獲取到的擴容唯一標識戳 非 本批次擴容
//可能當前執行緒長時間沒有獲得時間片,以至於其他執行緒都擴容結束,進行下一輪擴容了
//當前執行緒才姍姍來遲
// false->說明當前執行緒獲取到的擴容唯一標識戳 是 本批次擴容
//條件二: JDK1.8 中有bug jira已經提出來了 其實想表達的是 = sc == (rs << 16 ) + 1
// true-> 表示擴容完畢,當前執行緒不需要再參與進來了
// false->擴容還在進行中,當前執行緒可以參與
//條件三:JDK1.8 中有bug jira已經提出來了 其實想表達的是 = sc == (rs<<16) + MAX_RESIZERS
// true-> 表示當前參與並行擴容的執行緒達到了最大值 65535 - 1
// false->表示當前執行緒可以參與進來
//條件四:(nt = nextTable) == null
//nextTable在完成擴容後會賦值給table,然後nextTable置為null
// true->表示本次擴容結束
// false->擴容正在進行中
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
//前置條件:當前table正在執行擴容中.. 當前執行緒有機會參與進擴容。
//條件成立:說明當前執行緒成功參與到擴容任務中,並且將sc低16位元值加1,表示多了一個執行緒參與工作
//條件失敗:1.當前有很多執行緒都在此處嘗試修改sizeCtl,有其它一個執行緒修改成功了,導致你的sc期望值與記憶體中的值不一致 修改失敗
// 2.transfer 任務內部的執行緒也修改了sizeCtl。
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
//協助擴容執行緒,持有nextTable引數
transfer(tab, nt);
}
//1000 0000 0001 1011 0000 0000 0000 0000 +2 => 1000 0000 0001 1011 0000 0000 0000 0010
//條件成立,說明當前執行緒是觸發擴容的第一個執行緒,在transfer方法需要做一些擴容準備工作
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
//觸發擴容條件的執行緒 不持有nextTable
transfer(tab, null);
s = sumCount();
}
}
可以看到擴容操作在transfer函數中進行,addCount會負責維護sizeCtl,並且讓當前執行緒發起擴容,或者參與擴容,我們關注下發起和參與是如何區分的
可以看到addCount在並行擴容這一步相當於是一個排程者,在自旋中讓當前執行緒,要麼發起(只有一個幸運兒可以發起)要麼加入,要麼擴容結束當前執行緒break
接下來我們看看transfer是如何讓多個執行緒有條不紊進行擴容的
假如你現在有100個手下,你需要安排他們一起做一件事情(擴容遷移),且這一百個手下能力一樣(CPU分時排程,誰也別瞧不起誰)你要如何協調他們暱?
分配任務
遷移過程中如何告訴讀執行緒,這個陣列下標的元素已經被遷移走了,你去新表中讀吧,儘量不要阻塞讀
如何告訴寫執行緒,當前在擴容遷移,讓寫執行緒參與進來幫忙一起遷移?
這裡Doug lea使用 ForwardingNode
來實現新陣列的路由,和讓寫執行緒根據特定節點hash值,來參與擴容
如何同步多個執行緒遷移的進度
這裡使用全域性屬性transferIndex記錄遷移的精度,使用volatile修飾,實現進度對多個執行緒可見
transfer這段程式碼很妙,很長,我們分段看
任務分配和多執行緒間的協調和一些前置操作
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
//n 表示擴容之前table陣列的長度
//stride 表示分配給執行緒任務的步長
int n = tab.length, stride;
//MIN_TRANSFER_STRIDE = 16
//如果我們具備多核CPU 且 陣列長度/8/cpu核心數 > 16
// 那麼 每一個執行緒分配 【陣列長度/8/cpu核心數】個連續陣列單元進行擴容遷移
//NCPU > 1) ? (n >>> 3) / NCPU : n
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//條件成立:表示當前執行緒為觸發本次擴容的執行緒,需要做一些擴容準備工作
//條件不成立:表示當前執行緒是協助擴容的執行緒..
if (nextTab == null) { // initiating
try {
//建立了一個比擴容之前大一倍的table
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
//賦值給物件屬性 nextTable ,方便協助擴容執行緒 拿到新表
nextTable = nextTab;
//記錄遷移資料整體位置的一個標記。
// 從1開始計算的,不是從0開始,也就是說transferIndex = n 表示從n-1開始遷移直到0
transferIndex = n;
}
//表示新陣列的長度
int nextn = nextTab.length;
//fwd 節點,當某個桶位資料處理完畢後,將此桶位設定為fwd節點,
//fwd節點持有nextTable的參照,且其hash恆定位MOVED = -1
//如果執行緒企圖寫到這個單元格那麼會加入擴容,如果嘗試讀那麼將被轉移到新陣列中進行讀
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
//推進標記
boolean advance = true;
//完成標記
boolean finishing = false; // to ensure sweep before committing nextTab
//i 表示分配給當前執行緒任務,執行到的桶位,從0開始計數,
// 也就是說如果i=10 表示當前執行緒要遷移i-stride~i的陣列單元格
//bound 表示分配給當前執行緒任務的下界限制
int i = 0, bound = 0;
//自旋
for (;;) {
//f 桶位的頭結點
//fh 頭結點的hash
Node<K,V> f; int fh;
//下面的程式碼要進行
//1.給當前執行緒分配任務區間
//2.維護當前執行緒任務進度(i 表示當前處理的桶位)
//3.維護ConcurrentHashMap物件全域性範圍內的進度
while (advance) {
//分配任務的開始下標
//分配任務的結束下標
int nextIndex, nextBound;
//CASE1:
//條件一:--i >= bound
//成立:表示當前執行緒的任務尚未完成,還有相應的區間的桶位要處理,--i 就讓當前執行緒處理下一個 桶位.
//不成立:表示當前執行緒任務已完成 或 者未分配
if (--i >= bound || finishing)
advance = false;
//CASE2:
//前置條件:當前執行緒任務已完成 或 者未分配
//條件成立:表示物件全域性範圍內的桶位都分配完畢了,沒有區間可分配了,
// 設定當前執行緒的i變數為-1 跳出迴圈後,執行退出遷移任務相關的程式
//條件不成立:表示物件全域性範圍內的桶位尚未分配完畢,還有區間可分配
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//CASE3:
//前置條件:1、當前執行緒需要分配任務區間 2.全域性範圍內還有桶位尚未遷移
//條件成立:說明給當前執行緒分配任務成功
//條件失敗:說明分配給當前執行緒失敗,應該是和其它執行緒發生了競爭吧
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//CASE1:
//條件一:i < 0
//成立:表示當前執行緒未分配到任務
if (i < 0 || i >= n || i + n >= nextn) {
//儲存sizeCtl 的變數
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//條件成立:說明設定sizeCtl 低16位元 -1 成功,當前執行緒可以正常退出
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//1000 0000 0001 1011 0000 0000 0000 0000
//條件成立:說明當前執行緒不是最後一個退出transfer任務的執行緒
//如果當前執行緒是最後一個那麼 sizeCtl 前16位元是擴容標識戳,
//後16位元是 0000 0000 0010 = 2 所以這裡減2 就是拿到 高16位元為擴容戳,低16位元全0的數
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
//正常退出
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
//前置條件:【CASE2~CASE4】 當前執行緒任務尚未處理完,正在進行中
//CASE2:
//條件成立:說明當前桶位未存放資料,只需要將此處設定為fwd節點即可。
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//CASE3:
//條件成立:說明當前桶位已經遷移過了,當前執行緒不用再處理了,直接再次更新當前執行緒任務索引,再次處理下一個桶位 或者 其它操作
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
//CASE4:
//前置條件:當前桶位有資料,而且node節點 不是 fwd節點,說明這些資料需要遷移。
else {
//下面是真正進行遷移
nextTable的初始化
這可以看到擴容陣列大小是原陣列的兩倍(n<<1)但是doug lea 不害怕這裡nextTable重複初始化導致遷移工作白做的問題麼?其實在呼叫transfer方法的地方都有如下這段程式碼做控制
可以看到如果nextTable為null,那麼其他執行緒會視當前擴容還沒有啟動,從而break出去!doug lea之所以這麼做主要是為了實現nextTable初始化和其他執行緒寫操作的非同步,如果第一個執行緒沒有初始化nextTable 那麼其他執行緒break掉自旋,讓寫操作不要等待這個初始化nextTable的操作,狠狠的壓榨CPU!!! 臥槽 doug lea您是真的牛逼!
自旋+cas分配任務
真正進行遷移
這一段程式碼最妙的點,在對於最後一個執行緒的判斷,校驗sc-2低16位元是否全部為0的同時,還是保證了 當前執行緒是這一批次的擴容執行緒,並且對於最後一個執行緒,它將負責修改table 為新陣列,修改nextTable = null
其中如果遷移了null格子,或者遷移到已經被遷移的格子,advance會賦值位true,將繼續分配格子進行遷移
接下來就是使用synchronized,實現元素的遷移
//synchronized 加鎖當前桶位的頭結點
synchronized (f) {
//這裡和put 一樣會
if (tabAt(tab, i) == f) {
//ln 表示低位連結串列參照
//hn 表示高位連結串列參照
Node<K,V> ln, hn;
//條件成立:表示當前桶位是連結串列桶位
if (fh >= 0) {
//lastRun機制
//可以獲取出 當前連結串列 末尾連續高位不變的 node
int runBit = fh & n;
Node<K,V> lastRun = f;
//從頭開始,
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
//確保為連續的高位,或者為連續的低位
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
//條件成立:說明lastRun參照的連結串列為 低位連結串列,那麼就讓 ln 指向 低位連結串列
if (runBit == 0) {
ln = lastRun;
hn = null;
}
//否則,說明lastRun參照的連結串列為 高位連結串列,就讓 hn 指向 高位連結串列
else {
hn = lastRun;
ln = null;
}
//注意這裡是new了一個節點,並沒有改變原本連結串列的結構
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//加鎖了下面沒有使用
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
//條件成立:表示當前桶位是 紅黑樹 代理結點TreeBin
else if (f instanceof TreeBin) {
//轉換頭結點為 treeBin參照 t
TreeBin<K,V> t = (TreeBin<K,V>)f;
//低位雙向連結串列 lo 指向低位連結串列的頭 loTail 指向低位連結串列的尾巴
TreeNode<K,V> lo = null, loTail = null;
//高位雙向連結串列 lo 指向高位連結串列的頭 loTail 指向高位連結串列的尾巴
TreeNode<K,V> hi = null, hiTail = null;
//lc 表示低位連結串列元素數量
//hc 表示高位連結串列元素數量
int lc = 0, hc = 0;
//迭代TreeBin中的雙向連結串列,從頭結點 至 尾節點(TreeNode其實有next和pre指標將樹節點串聯起來)
for (Node<K,V> e = t.first; e != null; e = e.next) {
// h 表示迴圈處理當前元素的 hash
int h = e.hash;
//使用當前節點 構建出來的 新的 TreeNode
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
//條件成立:表示當前迴圈節點 屬於低位鏈 節點
if ((h & n) == 0) {
//條件成立:說明當前低位連結串列 還沒有資料
if ((p.prev = loTail) == null)
lo = p;
//說明 低位連結串列已經有資料了,此時當前元素 追加到 低位連結串列的末尾就行了
else
loTail.next = p;
//將低位連結串列尾指標指向 p 節點
loTail = p;
++lc;
}
//當前節點 屬於 高位鏈 節點
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//如果元素個數小於 紅黑樹轉化為連結串列的閾值 6 那麼進行連結串列化
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
//同上
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
這裡最妙最妙最妙的就是lastRun這個機制,為什麼doug lea要從頭找到尾部連續個高位或者低位的節點
何為高位,低位
注意紅色箭頭指的這一位,由於陣列的長度是2的倍數,所以結尾是全0,存在一位為1,和其他key的hash值進行且操作 = 獲取這個key在紅色箭頭這一位是0 還是 1,如果是1視作高位,反之視為低位。當前執行緒在遷移位於x位置的節點,那麼高位節點們遷移後將位於新表的x+n位置,低位節點遷移後還是位於新表的x位置。下面解釋這一點
如果一個數原本在紅色箭頭位置是1,那麼和擴容後的數進行且操作的時候,藍色箭頭位置這一位必然也是1,藍色箭頭之後的位置也是不變的,對於高位的key且操作之後,藍色箭頭位置是1,對於低位的key且 操作後藍色位置是0。為1可以看作實在原本橙色基礎上加上了一個64,為0可以看作是在原本橙色基礎上加了0
lastRun存在的必要,為什麼要這麼做?
減少new Node物件的操作
這裡原本就有一部分連續的高位節點,直接拿到綠色節點的作為高位的頭即可。但是不能切斷原本連結串列中任何的節點的連線指標,也就是說如果想把藍色框框這部分節點進行遷移,需要new Node,為什麼暱?
因為並行遷移的過程中,可能存在其他執行緒正在讀這個連結串列,如果我們斷開了之前節點的連線將導致其他執行緒找不到原本連結串列中的節點,doug lea後面也確實是使用new的方式,這樣相當於給讀執行緒一個一致性檢視(HashMap則沒有這個策略,直接把原節點拆開兩部分)