Java集合

2023-01-14 21:00:45

介紹 ConcurrentHashMap

技術是為了解決問題而生的,ConcurrentHashMap 解決了多個執行緒同時操作一個 HashMap 時,可能出現的內部問題。當多個執行緒同時操作一個 HashMap 時,有可能會出現多執行緒同時修改一個共用變數(HashMap 類的成員變數),導致資料被覆蓋,產生意想不到的錯誤。


ConcurrentHashMap 內部使用了鎖和各種資料結構來保證存取 Map 是執行緒安全的。

ConcurrentHashMap 和 HashMap 底層的陣列、連結串列結構幾乎相同,底層對資料結構的操作思路是相同的。ConcurrentHashMap 除了陣列 + 連結串列 + 紅黑樹的基本結構外,新增了轉移節點結構(ForwardingNode)。


介紹轉移節點(ForwardingNode)

轉移節點是 ForwardingNode 結構, ForwardingNode 繼承了 Node。ForwardingNode 節點的 hash 值固定為 -1。ForwardingNode 比 Node 多了一個 nextTable 成員變數,nextTable 成員變數的型別是 Node 陣列。nextTable 成員變數是擴容之後的新陣列。

如果陣列在索引 i 上的結構是 ForwardingNode,那麼表示這個雜湊桶內的全部節點都已經轉移到擴容之後的新陣列,舊的雜湊桶內的資料不能發生改變。轉移節點(ForwardingNode)是為了保證 ConcurrentHashMap 擴容時的執行緒安全。保證了當一個雜湊桶內的全部節點都已經轉移到擴容之後的新陣列後、擴容操作完成之前,舊的雜湊桶內的資料不發生變化。

當一個雜湊桶內的全部節點都已經轉移到擴容之後的新陣列後、擴容操作完成之前,如果有其他的執行緒執行 put 操作,需要將新增的節點 put 到舊的雜湊桶內,那麼這個執行緒會呼叫 helpTransfer() 方法幫助擴容。擴容完成之後,這個執行緒再將要新增的節點 put 到新的雜湊桶內。

ConcurrentHashMap 的新增操作

ConcurrentHashMap 在 put 方法上對資料結構的操作思路和 HashMap 相同,但 ConcurrentHashMap 的 put() 方法寫了很多保障執行緒安全的程式碼。當呼叫 ConcurrentHashMap 的 put() 方法時,put() 方法的處理邏輯如下:

  • 首先,如果 CHM 的成員變數 table 陣列為空(null 或者 length 為 0),則呼叫 initTable() 方法初始化 table 陣列。由於 initTable() 方法操作了共用變數,因此 initTable() 方法採用了一些手段來保證執行緒安全。
  • 接下來,它會呼叫 spread() 方法根據 key 計算出 hash 值,然後根據計算出的 hash 值計算出 key 對應的陣列索引 i
  • 計算出 key 對應的陣列索引 i 之後,它呼叫 tabAt() 方法,tabAt() 方法返回陣列在索引 i 上的值。然後它根據陣列在索引 i 上的值進行處理。由於 tabAt() 方法讀取了共用變數 table 陣列在索引 i 上的值,因此 tabAt() 方法呼叫 Unsafe 類的 get 方法保證資料的可見性
    • 如果陣列在索引 i 上的值為 null,則直接生成一個新的節點,並呼叫 casTabAt() 方法讓 tab[i] 指向該節點。由於 casTabAt() 方法操作了共用變數 tab 陣列,因此 casTabAt() 方法呼叫 Unsafe 類的 compareAndSwap 方法保證資料不被覆蓋
    • 如果陣列在索引 i 上的值不為 null,則意味著需要解決 hash 衝突問題、擴容衝突問題。
  • 接上一步驟,如果陣列在索引 i 上的值不為 null。
    • 如果索引 i 上的結構是轉移節點(ForwardingNode 結構,節點的 hash 值為 -1,表明這個雜湊桶內的全部節點都已經轉移到擴容之後的新陣列,舊的雜湊桶內的資料不能發生改變),它就會呼叫 helpTransfer() 方法,helpTransfer() 方法會幫助擴容。擴容完成之後,它再將要新增的節點 put 到擴容之後的新陣列中。
    • 如果索引 i 上的結構不是轉移節點,那麼它會使用 synchronized 關鍵字給索引 i 上的結構加鎖,保證同時最多隻有一個執行緒操作索引 i 上的結構。給索引 i 上的結構加鎖之後,它會判斷陣列在索引 i 上的結構是連結串列 還是 紅黑樹,然後呼叫相應的新增程式碼。
      • 如果索引 i 上的結構是連結串列,則把新生成的節點加到連結串列的末尾;
      • 如果索引 i 上的結構是紅黑樹,那麼使用紅黑樹方式新增。
  • 接上一步驟,如果索引 i 上的結構是普通連結串列,則把新生成的節點加到連結串列的末尾之後,需要判斷是否需要將連結串列轉為紅黑樹:
    • 如果連結串列的長度大於等於 8,並且陣列的長度大於等於 64,則呼叫 treeifyBin() 將連結串列轉為紅黑樹;
    • 如果連結串列的長度大於等於 8,但是陣列的長度小於 64,則呼叫 tryPresize() 方法執行擴容操作;
    • 當紅黑樹中的節點個數小於等於 6 時,紅黑樹會轉為連結串列。
  • 將節點加入 CHM 集合之後,put() 方法的最後一步是呼叫 addCount() 方法增加 ConcurrentHashMap 中元素個數的計數值。addCount() 方法的任務是:增加 ConcurrentHashMap 中元素的計數值。如果元素的數量超過了 ConcurrentHashMap 擴容的閾值(sizeCtl),那麼就會呼叫 transfer() 方法執行擴容操作。如果此時有其他的執行緒已經在執行擴容操作,那麼當前執行緒就協助擴容。

當呼叫 CHM 的 put() 方法時,如果 CHM 中已經存在要新增的 key,並且方法的入參 onlyIfAbsent 為 false,則替換舊值,並返回舊值。

ConcurrentHashMap 的擴容機制

ConcurrentHashMap 的擴容時機和 HashMap 相同,都是在 put() 方法的最後一步檢查是否需要擴容。ConcurrentHashMap 擴容的方法叫做 transfer(),從 put() 方法的 addCount() 方法進去,就能找到 transfer() 方法。

如果 ConcurrentHashMap 中元素的數量超過了擴容的閾值(sizeCtl),那麼它會呼叫 transfer() 方法執行擴容操作。ConcurrentHashMap 的擴容機制是擴容為原來容量的 2 倍。ConcurrentHashMap 擴容的處理邏輯和 HashMap 完全不同。


ConcurrentHashMap 擴容的大體思路如下:擴容需要把舊陣列上的全部節點轉移到擴容之後的新陣列上,節點的轉移是從陣列的最後一個索引位置開始,一個索引一個索引進行的。每個執行緒一輪處理有限個數的雜湊桶。當舊陣列上的全部節點轉移到擴容之後的新陣列後,ConcurrentHashMap 的 table 成員變數指向擴容之後的新陣列,擴容操作完成。transfer() 方法的處理邏輯如下:

  • 首先根據 CPU 核數和 table 陣列的長度,計算當前執行緒一輪處理雜湊桶的個數。ConcurrentHashMap 的 transferIndex 成員變數會記錄下一輪 或者是 下一個執行緒要處理的雜湊桶的索引值 + 1

    • 如果 CPU 核數為 1,那麼當前執行緒一輪處理雜湊桶的個數為 table 陣列的長度;
    • 如果 CPU 核數大於 1,那麼先計算 num = (tab.length >>> 3) / NCPU的值:
      • 如果 num 值大於等於 16,那麼 num 值就是當前執行緒一輪處理雜湊桶的個數;
      • 如果 num 值小於 16,那麼當前執行緒一輪處理雜湊桶的個數為 16。也就是說,執行緒一輪處理雜湊桶的個數最小值為 16。
  • 領取完任務之後執行緒就開始處理雜湊桶內的節點。節點的轉移是從陣列的最後一個索引位置開始,一個索引一個索引進行的。在轉移索引 i 上的節點之前,它會使用 synchronized 關鍵字給索引 i 上的結構加鎖,保證同時最多隻有一個執行緒操作索引 i 上的結構。

  • 給索引 i 上的結構加鎖之後,它會判斷陣列在索引 i 上的結構是連結串列 還是 紅黑樹,然後呼叫相應的節點轉移程式碼。

    • 如果索引 i 上的結構是連結串列,它通過將節點 key 的 hash 值 和 陣列的長度 n 做與運算獲得 n 對應的二進位制表示中的 1 這一位在 hash 值中是 0 還是 1,即 b = p.hash & n。獲取到 b 之後,用 b 來判斷要轉移的節點是要掛到低位雜湊桶裡,還是掛到高位雜湊桶裡。遍歷完連結串列,形成兩個連結串列(低位連結串列、高位連結串列)之後,將連結串列的頭節點賦值給對應的 tab[i]:

      • 如果 b 的值為 0,則要轉移的節點掛到位雜湊桶裡
      • 如果 b 的值非 0,則要轉移的節點掛到位雜湊桶裡
    • 如果索引 i 上的結構是紅黑樹那麼使用紅黑樹方式轉移節點。

  • 在將索引 i 上的全部節點轉移到擴容之後的新陣列後,它讓舊陣列 tab[i] 指向轉移節點(ForwardingNode)。

  • 當舊陣列上的全部節點轉移到擴容之後的新陣列後,ConcurrentHashMap 的 table 成員變數指向擴容之後的新陣列,擴容操作完成。

介紹低位雜湊桶、高位雜湊桶:如果 ConcurrentHashMap 當前的陣列長度為 n 時,一個節點的 key 對應的雜湊桶索引為 i。那麼 ConcurrentHashMap 擴容之後陣列長度為 2n 時,這個節點的 key 對應的低位雜湊桶的索引為 i,對應的高位雜湊桶的索引為 i + n。


ConcurrentHashMap 支援多執行緒擴容:

  • 如果在擴容的過程中,有其他的執行緒執行新增操作,新增操作完成後,這個執行緒會呼叫 transfer() 方法協助擴容。
  • 如果一個執行緒在擴容時,有其他的執行緒執行新增操作,需要把節點 put 到索引為 i 的雜湊桶內。其他的執行緒它發現索引 i 上的結構是轉移節點(ForwardingNode 結構, 節點的 hash 值為 -1,表明這個雜湊桶內的元素已經擴容遷移完成),那麼這個執行緒它就會呼叫 helpTransfer() 方法,helpTransfer() 方法會呼叫 transfer() 幫助擴容。擴容完成之後,它再將要新增的節點 put 到擴容之後的新陣列中。

ConcurrentHashMap 的查詢操作

當呼叫 ConcurrentHashMap 的 get() 方法時,get() 方法的處理邏輯如下:

  • 首先,它會根據傳入的 key 計算出 hash 值;然後根據計算出的 hash 值計算出 key 對應的陣列索引 i
  • 計算出 key 對應的陣列索引 i 之後,根據儲存位置,從陣列中取出對應的 Entry,然後通過 key 物件的 equals() 方法判斷傳入的 key 和 Entry 中的 key 是否相等:
    • 如果傳入的 key 和 Entry 中的 key 相等,則查詢操作完成,返回 Entry 中的 value;
    • 如果傳入的 key 和 Entry 中的 key 不相等,則再判斷陣列在索引 i 上的結構是連結串列 還是 紅黑樹,然後呼叫相應的查詢資料的方法。直到找到相等的 Entry 或者沒有下一個 Entry 為止。

ConcurrentHashMap 的容量大小問題

ConcurrentHashMap 的陣列長度總是為 2 的冪次方。不論傳入的初始容量是否為 2 的冪次方,最終都會轉化為 2 的冪次方。

ConcurrentHashMap 中根據 key 計算出 hash 值,然後根據計算出的 hash 值計算出 key 對應的陣列索引 i:

  • 根據 key 計算處 hash 值:在計算 hash 值時,它先將 key 的 hashCode 值無符號右移 16 位,然後再和 key 的 hashCode 值做 互斥或 運算,即 hash = (hashCode >>> 16) ^ hashCode
  • 根據 hash 值計算出 key 對應的陣列索引 i:在計算 key 對應的陣列索引 i 時,它將 hash 值 和 陣列的長度 - 1 做與運算獲得 key 對應的陣列索引 i,即 i = hash & (n - 1)

ConcurrentHashMap 的陣列長度總是為 2 的冪次方設計的非常巧妙:

  • 在計算 hash 值時,它先將 key 的 hashCode 值無符號右移 16 位,然後再和 key 的 hashCode 值做 互斥或 運算,即 hash = (hashCode >>> 16) ^ hashCode。使 key 的 hashCode 值高 16 位的變化對映到低 16 位中,使 hashCode 值高 16 位也參與後續索引 i 的計算,減少了碰撞的可能性。
  • 在計算 key 對應的陣列索引 i 時,它將 hash 值 和 陣列的長度 - 1 做與運算獲得 key 對應的陣列索引 i,即 i = hash & (n - 1)。由於陣列的長度 n 是 2 的冪次方,n - 1 可以保證它的二進位制表示中的後幾位都是 1,n 對應的二進位制位及之前的位都是 0。因此,計算出的陣列索引 i 和 hash 值的二進位制表示中後幾位有關,而與前面的二進位制位無關
  • 當 b 是 2 的冪次方時,a % b == a & (b - 1)。CPU 處理位運算比處理數學運算的速度更快,效率更高。
  • 在 ConcurrentHashMap 擴容時,它通過將 key 的 hash 值 和 陣列的長度 n 做與運算獲得 n 對應的二進位制表示中的 1 這一位在 hash 值中是 0 還是 1,即 b = p.hash & n。獲取到 b 之後,用 b 來判斷要轉移的節點是要掛到低位雜湊桶裡,還是掛到高位雜湊桶裡:
    • 如果 b 的值為 0,則要轉移的節點掛到位雜湊桶裡
    • 如果 b 的值非 0,則要轉移的節點掛到位雜湊桶裡

ConcurrentHashMap 的計數

當呼叫 ConcurrentHashMap 的 put() 方法時,put() 方法的最後一步是呼叫 addCount() 方法。

addCount() 方法的任務是:增加 ConcurrentHashMap 中元素的計數值。如果元素的數量超過了 ConcurrentHashMap 擴容的閾值(sizeCtl),那麼就會呼叫 transfer() 方法執行擴容操作。如果此時有其他的執行緒已經在執行擴容操作,那麼當前執行緒就協助擴容。


ConcurrentHashMap 採用了一些資料結構和手段來支援高效的並行計數。ConcurrentHashMap 使用 long 型別的 baseCount 成員變數和 CounterCell 型別的 counterCells 陣列來支援高效的並行計數。

  • baseCount 是基礎的計數值。主要通過呼叫 Unsafe 類的 compareAndSwap 方法更新 baseCount 的值
  • counterCells 陣列的使用:如果有多個執行緒呼叫 addCount() 方法增加元素的計數值,那麼每個執行緒將要增加的計數值儲存在 counterCells 陣列中。當呼叫 ConcurrentHashMap 的 size() 方法獲取元素個數時,size() 方法將回圈遍歷 counterCells 陣列,累加計數值得到當時元素。

ConcurrentHashMap 的計數將執行緒競爭分散到 counterCells 陣列的每一個元素,提高了並行計數的效能。

private transient volatile long baseCount;

// 如果 counterCells 陣列不為空,則陣列的長度為 2 的冪次方。
private transient volatile CounterCell[] counterCells;

@sun.misc.Contended static final class CounterCell {
    volatile long value;
    CounterCell(long x) { value = x; }
}