ConCurrentHashMap
是一個支援高並行集合,常用的集合之一,在jdk1.8
中ConCurrentHashMap
的結構和操作和HashMap
都很類似:
陣列+連結串列/紅黑樹
。get
通過計算hash值後取模陣列長度確認索引來查詢元素。put
方法也是先找索引位置,然後不存在就直接新增,存在相同key
就替換。table
陣列,原來的資料轉移到新的table
陣列中。唯一不同的是,HashMap
不支援並行操作,ConCurrentHashMap
是支援並行操作的。所以ConCurrentHashMap
的設計也比HashMap
也複雜的多,通過閱讀ConCurrentHashMap
的原始碼,也更加了解一些並行的操作,比如:
volatile
執行緒可見性CAS
樂觀鎖synchronized
同步鎖/悲觀鎖詳見HashMap
相關文章:
ConCurrentHashMap
是由陣列+連結串列/紅黑樹
組成的:
其中左側部分是一個雜湊表
,通過hash演演算法
確定元素在陣列的下標:
transient volatile Node<K,V>[] table;
連結串列是為了解決hash衝突
,當發生衝突的時候。採用連結串列法
,將元素新增到連結串列的尾部。其中Node
節點儲存資料:
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
}
Node
節點包含:
hash
hash值key
值value
值next
next指標// 最大容量
int MAXIMUM_CAPACITY = 1 << 30;
// 初始化容量
int DEFAULT_CAPACITY = 16
// 控制陣列初始化或者擴容,為負數時,表示陣列正在初始化或者擴容。-1表示正在初始化。其他情況-n表示n執行緒正在擴容。
private transient volatile int sizeCtl;
// 裝載因子
float LOAD_FACTOR = 0.75f
// 連結串列長度為 8 轉成紅黑樹
int TREEIFY_THRESHOLD = 8
// 紅黑樹長度小於6退化成連結串列
int UNTREEIFY_THRESHOLD = 6;
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 計算hash值
int h = spread(key.hashCode());
// 判斷 tab 不為空並且 tab對應的下標不為空
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// eh < 0 表示遇到擴容
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 遍歷連結串列,直到遍歷key相等的值
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
spread
獲取hash
值,通過(n - 1) & h
取餘獲取陣列下標的資料。eh<0
表示遇到了擴容,會呼叫正在擴容節點ForwardingNode
的find
方法,查詢該節點,匹配就返回。null
。get方法裡面沒有使用到鎖,那是如何實現執行緒安全。主要使用到了volatile
。
volatile
一個執行緒對共用變數的修改,另外一個執行緒能夠立刻看到,我們稱為可見性
。
cpu
執行速度比記憶體速度快很多,為了均衡和記憶體之間的速度差異,增加了cpu快取
,如果在cpu快取中存在cpu
需要資料,說明命中了cpu
快取,就不經過存取記憶體。如果不存在,則要先把記憶體的資料載入到cpu
快取中,在返回給cpu
處理器。
在多核cpu
的伺服器中,每個cpu
都有自己的快取,cpu
之間的快取是不共用的。 當多個執行緒在不同的cpu
上執行時,比如下圖中,執行緒A
操作的是cpu-1
上的快取,執行緒B
操作的是cpu-2
上的快取,這個時候,執行緒A
對變數V
的操作對於執行緒B
是不可見的
。
但是一個變數被volatile
宣告,它的意思是:
告訴編譯器,對這個變數的讀寫,不能使用cpu快取,必須從記憶體中讀取或者寫入。
上面的變數V被volatile
宣告,執行緒A在cup-1中修改了資料,會直接寫到記憶體中,不會寫入到cpu快取中。而執行緒B無法從cpu快取讀取變數,需要從主記憶體拉取資料。
volatile
關鍵字的變數會將修改的變數強制寫入記憶體中。volatile
在get
應用table
雜湊表transient volatile Node<K,V>[] table;
使用volatile
宣告陣列,表示參照地址
是volatile
而不是陣列元素
是volatile
。
既然不是
陣列元素
被修飾成volatile
,那實現執行緒安全在看Node
節點。
Node
節點static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
}
其中val
和next
都用了volatile
修飾,在多執行緒環境下,執行緒A修改節點val
或者新增節點對別人執行緒是可見的
。
所以get
方法使用無鎖操作是可以保證執行緒安全
。
既然
volatile
修飾陣列對get
操作沒有效果,那加在volatile
上有什麼目的呢?
是為了陣列在擴容的時候對其他執行緒具有可見性。
val
和next
都用volatile
修飾,保證執行緒修改或者新增節點對別人執行緒是可見的。volatile
修飾table
陣列,保證陣列在擴容時其它執行緒是具有可見性的。put(K key, V value)
直接呼叫putVal(key, value, false)
方法。
public V put(K key, V value) {
return putVal(key, value, false);
}
putVal()
方法:
final V putVal(K key, V value, boolean onlyIfAbsent) {
// key或者value為空,報空指標錯誤
if (key == null || value == null) throw new NullPointerException();
// 計算hash值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
// tab為空或者長度為0,初始化table
tab = initTable();
// 使用volatile查詢索引下的資料
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 索引位置沒有資料,使用cas新增資料
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// MOVED表示陣列正在進行陣列擴容,當前進行也參加到陣列複製
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 陣列不在擴容和也有值,說明資料下標處有值
// 連結串列中有資料,使用synchronized同步鎖
synchronized (f) {
if (tabAt(tab, i) == f) {
// 為連結串列
if (fh >= 0) {
binCount = 1;
// 遍歷連結串列
for (Node<K,V> e = f;; ++binCount) {
K ek;
// hash 以及key相同,替換value值
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 遍歷到連結串列尾,新增連結串列節點
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 紅黑樹,TreeBin雜湊值固定為-2
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 連結串列轉紅黑樹
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
key
或者value
為null
都會報空指標錯誤。hash
值,然後開啟沒有終止條件的迴圈。table
陣列為null
,初始化陣列。table
不為空,通過volatile
找到陣列對應下標是否為空,為空就使用CAS
新增頭結點。hash
=-1
表示陣列正在擴容,一起進行擴容操作。synchronized
鎖住當前位置的節點,防止被其他執行緒修改。
key
替換value
值。如果連結串列找不到,就新增到連結串列尾部。8
,連結串列就轉成紅黑樹。
ConcurrentHashMap
鍵值對為什麼都不能為null
,而HashMap
就可以?
通過get
獲取資料時,如果獲取的資料是null
,就無法判斷,是put時的value為null,還是找個key就沒做過對映。HashMap是非並行的,可以通過contains(key)
判斷,而支援並行的ConcurrentHashMap
在呼叫contains
方法和get
方法的時候,map
可能已經不同了。參考
如果陣列table
為空呼叫initTable
初始化陣列:
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// table 為 null
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
// sizeCtl<0表示其它執行緒正在初始化陣列陣列,當前執行緒需要讓出CPU
Thread.yield(); // lost initialization race; just spin
// 呼叫CAS初始化table表
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
initTable
判斷sizeCtl
值,如果sizeCtl
為-1
表示有其他執行緒正在初始化陣列,當前執行緒呼叫Thread.yield
讓出CPU
。而正在初始化陣列的執行緒通過Unsafe.compareAndSwapInt
方法將sizeCtl
改成-1
。
initTable
最外層一直使用while
迴圈,而非if
條件判斷,就是確保陣列可以初始化成功。
陣列初始化成功之後,再執行新增的操作,呼叫tableAt
通過volatile
的方式找到(n-1)&hash
處的bin
節點。
CAS
新增節點。synchronized
鎖,索引對應的bin
節點,進行新增或者更新操作。Insertion (via put or its variants) of the first node in an
empty bin is performed by just CASing it to the bin. This is
by far the most common case for put operations under most
key/hash distributions. Other update operations (insert,
delete, and replace) require locks. We do not want to waste
the space required to associate a distinct lock object with
each bin, so instead use the first node of a bin list itself as
a lock. Locking support for these locks relies on builtin
"synchronized" monitors.
如果f的hash值為-1,說明當前f是ForwaringNode節點,意味著有其它執行緒正在擴容,則一起進行擴容操作。
完成新增或者更新操作之後,才執行break
終止最外層沒有終止條件的for迴圈,確保資料可以新增成功。
最後執行addCount
方法。
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 利用CAS更新baseCoount
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
// check >= 0,需要檢查是否需要進行擴容操作
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
*插入一個新的節點:
treeifyBin
把連結串列轉成紅黑樹,在轉成之前,會判斷陣列長度小於MIN_TREEIFY_CAPACITY
,預設是64
,觸發擴容。put
方法,在結尾addCount
方法記錄元素個數,並檢查是否進行擴容,陣列元素達到閾值時,觸發擴容。不使用加鎖的,支援多執行緒擴容。利用並行處理減少擴容帶來效能的影響。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
// 建立nextTab,容量為原來容量的兩倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
// 擴容是丟擲異常,將閾值設定成最大,表示不再擴容。
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
// 建立 ForwardingNode 節點,作為標記位,表明當前位置已經做過桶處理
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// advance = true 表明該節點已經處理過了
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 控制 --i,遍歷原hash表中的節點
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 用CAS計算得到的transferIndex
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// 將原陣列中的節點賦值到新的陣列中,nextTab賦值給table,清空nextTable。
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 所有節點完成複製工作,
if (finishing) {
nextTable = null;
table = nextTab;
// 設定新的閾值為原來的1.5倍
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 利用CAS方法更新擴容的閾值,sizeCtl減一,說明新加入一個執行緒參與到擴容中
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
// 遍歷的節點為null,則放入到ForwardingNode指標節點
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// f.hash==-1表示遍歷到ForwardingNode節點,說明該節點已經處理過了
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// 節點加鎖
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// fh>=0,表示為連結串列節點
if (fh >= 0) {
// 構建兩個連結串列,一個是原連結串列,另一個是原連結串列的反序連結串列
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 在nextTable i 位置處插入連結串列
setTabAt(nextTab, i, ln);
// 在nextTable i+n 位置處插入連結串列
setTabAt(nextTab, i + n, hn);
// 在table i的位置處插上ForwardingNode,表示該節點已經處理過
setTabAt(tab, i, fwd);
// 可以執行 --i的操作,再次遍歷節點
advance = true;
}
// TreeBin紅黑樹,按照紅黑樹處理,處理邏輯和連結串列類似
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 擴容後樹節點的個數<=6,紅黑樹轉成連結串列
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
擴容過程有的複雜,主要涉及到多執行緒的並行擴容,ForwardingNode
的作用就是支援擴容操作,將已經處理過的節點和空節點置為ForwardingNode
,並行處理時多個執行緒處理ForwardingNode
表示已經處理過了,就往後遍歷。
ConcurrentHashMap
是基於陣列+連結串列/紅黑樹
的資料結構,新增、刪除、更新都是先通過計算key
的hash
值確定資料的索引值,這和HashMap
是類似的,只不過ConcurrentHashMap
針對並行做了更多的處理。
get
方法獲取資料,先計算hash
值再再和陣列長度取餘操作獲取索引位置。
volatile
關鍵字找到table
保證多執行緒環境下,陣列擴容具有可見性
,而Node
節點中val
和next
指標都使用volatile
修飾保證資料修改後別的執行緒是可見的
。這就保證了ConcurrentHashMap
的執行緒安全性
。put
方法新增或者更新資料。
key
或value
為空,就報錯。這是因為在呼叫get
方法獲取資料為null
,無法判斷是獲取的資料為null,還是對應的key
就不存在對映,HashMap
可以通過contains(key)
判斷,而ConcurrentHashMap
在多執行緒環境下呼叫contains
和get
方法的時候,map
可能就不同了。table
陣列為空,先初始化陣列,先通過sizeCtl
控制並行,如果小於0表示有別的執行緒正在初始化陣列,就讓出CPU
,否則使用CAS
將sizeCtl
設定成-1
。CAS
新增節點。transfer
擴容
8
,會將連結串列轉成紅黑樹,在轉成之前,會先判斷陣列長度小於64
,會觸發擴容。還有集合個數達到閾值時也會觸發擴容。ForwardingNode
節點作為標記位,如果遍歷到該節點,說明已經做過處理。Java並行——ConcurrentHashMap(JDK 1.8)