本文主要講述AQS的共用模式,共用和獨佔具有類似的套路,所以如果你不清楚AQS的獨佔的話,可以看我的《JUC原始碼學習筆記1》
主要參考內容有《Java並行程式設計的藝術》,《Java並行程式設計實戰》和下面兩位博主的部落格
https://segmentfault.com/a/1190000016447307 這是我見過講AQS共用最好的部落格
https://www.cnblogs.com/micrari/p/6937995.html 這個文章是對PROPAGATE的作用比較好的詮釋
公司有五個坑位可以用來上廁所,對於一個廁所來說,五個坑位可以看作是五個共用的資源,同時可以允許五個員工(執行緒)來上廁所,當前任何一個員工進入其中一個坑位,那麼可用坑位(共用資源)減少,當員工出來的時候,共用資源被釋放,當全部都被人佔用的時候,後續上廁所的人需要等待(表現線上程獲取共用資源阻塞)當然這個等待可以被中斷(測試給等待的開發提了bug,開發放棄排隊回到工位)這個等待也可以超時(等太久心態崩了不等了)
Semaphore號誌來控制多個執行緒同時存取某個特定共用資源的運算元量
很直觀,我們可以意識到,Semaphore式基於AQS的共用模式
方法 | 描述 |
---|---|
public Semaphore(int permits) | 指定許可數量的構造方法(廁所有多少個坑位) |
public Semaphore(int permits, boolean fair) | 建立具有給定許可數量和給定公平設定的號誌(第二個引數指定釋放公平,好比說員工的素質,有沒有上廁所不排隊的人) |
public void acquire() throws InterruptedException | 可中斷的獲取一個許可,如果獲取許可,許可數量減少1方法返回,否則阻塞當前執行緒之到出現以下情況 1. 其他執行緒釋放了許可,並且當前執行緒獲得了許可(廁所出來了一個,而且你如願如廁) 2.其他執行緒中斷了當前執行緒(測試提bug打電話中斷了你的排隊) |
public void acquireUninterruptibly() | 和acquire() 類似,但是這個方法不響應中斷即在獲取許可的途中不會因為中斷了放棄(人有三急,天王老子來了也得先如廁) |
public boolean tryAcquire() | 嘗試獲取許可,如果成功獲取許可返回true並且減少許可,反之返回false,(你來到廁所隨便看了以下,有可以的坑位立馬進去,反之直接回工位) |
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException | 和tryAcquire()類似,但是響應中斷,支援超時,如果在指定時間餒獲取到共用資源返回true,如果超時未獲取返回false,如果獲取的途中被中斷丟擲中斷異常 |
public void release() | 釋放許可,並且許可數量加1(如廁完釋放坑位) |
public int availablePermits() | 返回此號誌中可用的當前許可數。 |
public int drainPermits() | 獲取並返回所有立即可用的許可證。 |
protected void reducePermits(int reduction) | 按指示的減少量減少可用許可證的數量。 |
acquire,acquireUninterruptibly,tryAcquire,release還有支援獲取指定數量共用資源的過載方法
顯而易見,Semaphore是基於AQS的共用模式,Semaphore方法的都是委託給Sync
Semaphore的acquire方法直接呼叫的是sync的acquireSharedInterruptibly(1),這個方法在sync的父類別AbstractQueuedSynchronizer中進行了實現
和ReentrantLock類似的套路,很多並行都是使用這種內部類的方式,把功能的實現交給內部類
相對於獨佔的鎖的`tryAcquire(int arg)`返回boolean型別的值,共用鎖的`tryAcquireShared(int acquires)`返回的是一個整型值:
- 如果該值小於0,則代表當前執行緒獲取共用鎖失敗
- 如果該值大於0,則代表當前執行緒獲取共用鎖成功,並且接下來其他執行緒嘗試獲取共用鎖的行為很可能成功
- 如果該值等於0,則代表當前執行緒獲取共用鎖成功,但是接下來其他執行緒嘗試獲取共用鎖的行為會失敗
直接呼叫的是nonfairTryAcquireShared方法
final int nonfairTryAcquireShared(int acquires) {
//一個自選
for (;;) {
//可用的許可
int available = getState();
//剩餘=可用-當前需要的許可
int remaining = available - acquires;// 1
//如果剩餘小於0 或 cas設定許可數量位true 返回剩餘剩餘許可數量
//值得品一品
if (remaining < 0 ||
compareAndSetState(available, remaining))// 2
return remaining;
}
}
自旋結束的情況
剩餘許可小於0 表示當前剩餘的許可不足以滿足我們的要求
當前許可可以滿足我們的需求,且成功CAS修改許可的數量
可能執行緒A 執行到1這一行發現是足夠的,但是當前很多執行緒在競爭資源,導致執行2的時候當前執行緒CAS失敗,那麼會進入下一輪迴圈
protected int tryAcquireShared(int acquires) {
for (;;) {
//如果前面有執行緒在等待,公平起見,返回-1 獲取共用資源失敗
if (hasQueuedPredecessors())
return -1;
//和非公平一樣
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
雖然在獨佔模式中沒有名稱叫doAcquireInterruptibly的方法,但是還是那個套路
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//構造節點,加入到同步佇列尾部
final Node node = addWaiter(Node.SHARED);
//獲取失敗標誌
boolean failed = true;
try {
//自選
for (;;) {
//前繼節點
final Node p = node.predecessor();
//前繼節點是頭節點
if (p == head) {
//嘗試獲取共用資源
int r = tryAcquireShared(arg);
//獲取成功
if (r >= 0) {
//設定為頭節點並且傳播
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//掛起當前執行緒 如果被中斷那麼直接丟擲中斷異常
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
//如果當前節點放棄了,這裡對應被中斷了(超時獲取方法在超時的情況也會進入)
if (failed)
cancelAcquire(node);
}
}
和獨佔不同的點在於:
addWaiter(Node.SHARED) 標記當前節點是共用模式
這個Node.SHARED設定到當前節點的nextWaiter屬性上,nextWaiter在此的作用只是標記當前節點的模式(獨佔or共用)
在Condition等待佇列中才起到串聯等待執行緒的作用的,後續會有專門一篇講解
獨佔的時候呼叫的是setHead方法,這裡呼叫的是 setHeadAndPropagate(當前執行緒節點,tryAcquireShared返回值(在號誌中可以理解為剩餘的許可證數量))
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
//設定為頭,頭在AQS是獲取到鎖的執行緒,也意味著從同步佇列中出隊了,
setHead(node);
//喚醒
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
doReleaseShared方法的作用的是在當前共用鎖是可獲取的狀態時,喚醒head節點的下一個節點,這個方法的詳細作用的後面講解,現在我們來分析,setHeadAndPropagate在什麼情況下會呼叫這個方法
剩餘的共用資源大於0 propagate > 0
在共用鎖模式下,鎖可以被多個執行緒所共同持有,既然當前執行緒已經拿到共用鎖了,其還有剩餘的共用資源,那麼就可以直接通知後繼節點來拿鎖,而不必等待鎖被釋放的時候再通知。(來到廁所發現五個坑都可用,發訊息給好兄弟,快來,拉屎自由)
當前節點的下一個節點不為空且是共用模式 if (s == null || s.isShared())
舊頭節點等待狀態小於0 or 當前頭節點等待狀態小於0
共用資源在被獲取後,執行緒都會設定自己為頭節點,所有頭節點在共用模式中表示的是獲取到共用資源的執行緒或者曾經獲取共用資源的執行緒
在當前共用鎖是可獲取的狀態時,喚醒head節點的下一個節點
這個方法除了在setHeadAndPropagate 中被呼叫意外,還在共用資源的釋放(releaseShared)中會被呼叫,想象一個場景,存在一個執行緒A釋放鎖的同時,一個執行緒B拿到鎖,前者呼叫releaseShared,後者呼叫setHeadAndPropagate ,並行的呼叫到doReleaseShared 方法進行喚醒頭節點下一個節點,所以doReleaseShared 需要考慮執行緒安全問題
//值得好好品一品
private void doReleaseShared() {
//迴圈
for (;;) {
//頭 可能這一行執行完h就是舊的頭,存在另外一個執行緒獲取到共用鎖,將自己設定為頭
Node h = head;
//h頭不為null 不等於尾,說明至少需要當前佇列中至少有兩個節點
if (h != null && h != tail) {
//h頭的狀態
int ws = h.waitStatus;
//h頭狀態為SINGNAL 說明後續節點入隊的時候addWaiter把當前節點的狀態設定,說明後續節點需要喚醒
if (ws == Node.SIGNAL) {
//CAS修改h狀態為0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
//喚醒後續執行緒
unparkSuccessor(h);
}
//h狀態為0 且CAS設定為PROPAGATE 失敗
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
//h 等於當前頭,說明執行上面一段的時候沒有頭沒有變化,沒有其他執行緒獲取共用資源
if (h == head)
break;
}
}
if (h == head)
break;
這裡h是剛進入doReleaseShared 時候的頭節點,head是當前佇列的頭,如果二者相等那麼退出迴圈
一個廁所有五個坑,在某一個時刻五個坑被ABCDE佔用,後面還有EF兩個倒黴等待排成佇列,ABCDE佔用坑的時候都會設定自己為頭節點,會有幾個人獲取到坑位的時候呼叫doReleaseShared (比如D第四個來,發現還有一個坑,立馬說,後面的兄弟還有一個廁所)再比如五個坑都被佔用但是E發現自己的狀態為SINGAL(是E排隊的時候提醒自己拉完提醒自己,他先玩會兒手機(掛起))
某一個時刻多個人拉完的時候,釋放坑位走出廁所,A釋放到if (h == head)的時候,發現頭節點變化了,繼續喊兄弟們去看看說不定有坑位,B也是如此,同一個時間可能有多個拉完的人都在喚醒後面的人去上廁所,這樣後面排隊玩手機的人,被喚醒的效率更高,從而提升了廁所的利用效率
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//1.情況一
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
//2.情況二
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
情況一
ws == Node.SIGNAL
說明是後繼節點入隊的時候修改了節點的狀態,表示需要喚醒
這裡使用CAS保證多個執行緒執行當前方法只有一個執行緒可以成功喚醒後續的執行緒(共用鎖存在多個執行緒並行喚醒的情況)
同時兩個小夥子從廁所出來,只需要一個人通知到等待的人就好
情況二
ws == 0
可能是頭節點沒有後繼入隊,所以節點狀態為初始狀態,但是上面的 if (h != null && h != tail)
保證了佇列中至少存在兩個節點
ws == 0
還可能是上面的情況1修改為了0,但是這種情況不會進入當前分支
最後只可能是尾節點成為了頭節點,compareAndSetWaitStatus(h, 0, Node.PROPAGATE)
需要返回false才能繼續迴圈,說明後續節點入隊修改了節點的狀態為SIGANAL,此時會繼續迴圈喚醒後繼節點。注意到最上面if (h != null && h != tail)
也就是說佇列至少存在兩個節點,讓程式碼執行到情況1,要想ws == Node.SIGNAL
不成立說明這個頭節點剛剛成為頭節點,狀態還沒來得及被後繼節點修改為SINGANL,緊接著後繼節點恰好修改了頭節點狀態為SINGAL才能促使!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)
為true,也就是說明情況二是在新的頭節點產生,且沒來得及被後繼節點修改為SINGAL,並且在頭節點執行緒執行compareAndSetWaitStatus(h, 0, Node.PROPAGATE)
的一瞬間後繼節點搶先一步修改了頭節點的狀態為SINGAL才能走到情況二的continue中
10:40:01的時候A成功獲得如廁的權利,此時廁所是滿的,A設定自己為頭節點,發現原來的頭節點狀態是SINGANL,他準備喚醒後面排隊的兄弟
10:40:02 B發現沒有廁所,排在佇列的第一個,準備修改A的狀態為SINGAL(讓A記得喚醒自己)此時A已經在執行喚醒的流程了,此時佇列存在兩個節點,A為頭,B為尾巴,A執行到情況1,發現自己不是SINGAL,來到情況2,準備修改自己狀態為PROPAGATE但是失敗了(此時B剛好修改A狀態為SINGAL了)A繼續執行for迴圈(可能存在其他人上完廁所,喚醒了B,B成為新頭節點),
此時A會拿到佇列的頭節點(最近剛剛獲得鎖的節點)繼續執行for迴圈,最後佇列的頭節點沒有變化了,A才罷休
直接呼叫的是AQS的acquireShared(1)方法
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
大致邏輯和acquireSharedInterruptibly,其不響應中斷體現在doAcquireShared中
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
//如果被中斷了,那麼補上中斷,而不是丟擲異常
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//parkAndCheckInterrupt() 放回true 表示當前執行緒從LockSupport中返回是因為被中斷了,那麼把interrupted置為true,繼續迴圈
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
套路和獨佔模式中類似,當前執行緒從LockSupport中返回後檢查其中斷表示,發現是由中斷那麼會在當前獲取到共用資源後補上中斷標識
直接呼叫AQS的tryAcquireSharedNanos方法
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//如果直接獲取共用資源成功那麼直接返回true了 短路後續的doAcquireSharedNanos
return tryAcquireShared(arg) >= 0 ||
//超時獲取共用資源
doAcquireSharedNanos(arg, nanosTimeout);
}
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null;
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
和獨佔的超時獲取一樣的套路,都是必須剩餘時間大於spinForTimeoutThreshold閾值才掛起否則進行自旋,響應中斷也是類似的內容
直接呼叫的AQS的releaseShared
public final boolean releaseShared(int arg) {
//號誌中靜態內部類重寫
if (tryReleaseShared(arg)) {
//喚醒後續等待執行緒 前面說過
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
//自旋
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//CAS設定許可的數量,
if (compareAndSetState(current, next))
return true;
}
}
和獨佔不同的是,共用鎖存在多個多個執行緒一起釋放的情況,所以使用自選+CAS保證許可證的數量不會在並行的情況下出現錯誤
減少許可的邏輯類似,也是迴圈+CAS的方式
閉鎖的工具相當於一扇門,在閉鎖達到結束狀態前,門一直是關閉,沒有任何執行緒可以通過,當達到結束狀態後,閉鎖才會讓允許所有執行緒通過,當閉鎖到達結束狀態後不會再改變狀態保持開狀態
方法 | 作用 |
---|---|
void await() throws InterruptedException | 使當前執行緒等待直到鎖存器倒計時到零,從這個方法返回的兩個方式 1.計數到0 2.等待的執行緒被中斷,丟擲中斷異常返回 |
void await(long timeout, TimeUnit unit)throws InterruptedException | 和await()類似,但是如果超時也會直接返回 |
void countDown() | 計數減1,如果計數到達0那麼所有等待的執行緒將可以通行 |
long getCount() | 返回當前計數。此方法通常用於偵錯和測試目的 |
AB執行緒需要等待CDE執行完後繼續執行,其實CDE霸佔鎖阻塞AB,後CDE都釋放鎖後AB才能繼續執行
直接呼叫靜態內部類的acquireSharedInterruptibly(1)方法,這個方法會直接呼叫靜態內部類範例的tryAcquireShared(1)方法
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
獲取AQS中state(CountDownLatch構造方法設定此值,表示需要等待多少個執行緒執行countDown),上面我們說過tryAcquireShared返大於等於0表示獲取共用資源成功,負數表示失敗後續會進入等待佇列中,這裡並沒有返回0這種情況,如果共用資源為0表示「門開了」執行此方法的執行緒可以自由的執行了,反之將排隊等待,之所以沒有返回0,是因為CountDownLatch支援多個執行緒比如ABC一起等待,返回0表示當前執行緒獲取資源成功但是後續執行緒獲取會失敗,返回1可以保證當前執行緒看見門開了後會去喚醒其他執行緒
直接呼叫的AQS的tryAcquireSharedNanos,同樣呼叫重寫的tryAcquireShared 方法,後續呼叫doAcquireSharedNanos 邏輯和上面號誌的內容一樣
直接呼叫AQS的releaseShared方法,呼叫到重寫的tryReleaseShared方法
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
//減少之前就是0 直接返回false
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
//減少到0 才會返回true
return nextc == 0;
}
}
還是同樣的套路CAS+自選保證state,但是如果減少之前就已經為0的話的返回false,且只有減少後為0 才會返回true,保證了無效的多次喚醒
假如計數為4,但是五個執行緒執行countDown,第五個執行緒執行的時候發現被四個老六搶先一步了,直接返回了false,前面四個老六執行的時候也只有最後一個才會返回true(減少到0)這時候才會執行tryReleaseShared 進行喚醒
可以看到當多個執行緒釋放共用資源的時候,如果當前佇列中存在排隊的節點,那麼可能存在多執行緒一起並行呼叫doReleaseShared的可能,如果頭節點為signal 說明後續節點需要喚醒,使用CAS保證了只有一個執行緒可以成功執行unparkSuccessor喚醒後續執行緒,後續執行緒也許在此之前執行了tryAcquireShard返回負數,準備掛起自己,也許在掛起自己之前被執行了unpark,或者掛起之後立馬被執行了unpark,繼續拿共用資源,而那些CAS失敗的執行緒會繼續喚醒,這點體現了三個資源釋放,不會只喚醒一個。並且這個方法退出的方法只有在喚醒途中頭節點沒有變化的情況,沒有變法說明共用資源的爭搶沒有那麼激烈了(頭節點是最近拿到共用資源的節點)
這個時候執行緒B 和執行緒A 必定存在一個執行緒CAS失敗,如果執行緒B失敗,那麼意味執行緒A成功CAS為SIGNAL,但是shouldParkAfterFailedAcquire 返回false 還要繼續自旋,這時候也許tryAcquireShared成功了就沒有必要掛起了,如果執行緒A自選到tryAcquireShared,被一個老六執行緒搶先一步獲取共用共用資源了,這時候執行緒A會執行shouldParkAfterFailedAcquire 返回true 準備掛起自己了,這是執行緒B也許就成功喚醒了執行緒A。如果執行緒ACAS失敗了,還會進行一次自旋,執行緒B如果CAS成功也會進行一次自旋,也許執行緒A就成功拿到共用資源改變自己為頭節點,執行緒B還要執行一次自旋。這一切都為了提高系統的吞吐量讓共用資源儘量不要浪費,不要因為喚醒的不及時而讓需要應該工作的執行緒被掛起。