關注:王有志,一個分享硬核Java技術的互金摸魚俠。
歡迎你加入Java人的提桶跑路群:共同富裕的Java人
今天我們來聊一聊AQS家族中另一個重要成員Semaphore,我只收集到了一道關於Semaphore的面試題,問了問「是什麼」和「如何實現的」:
按照我們的慣例,依舊是按照「是什麼」,「怎麼用」和「如何實現的」這3步來分析Semaphore。另外,今天提供了題解。
Semaphore直譯過來是號誌,是電腦科學中非常Old School的處理同步與互斥的機制,與互斥鎖不同的是它允許指定數量的執行緒或程序存取共用資源。
Semaphore處理同步與互斥的機制和我們平時過地鐵站的閘機非常相似。刷卡開啟閘機(acquire
操作),通過後(存取臨界區)閘機關閉(release
操作),後面的人才能夠繼續刷卡,而在前一個人通過前,後面的人只能排隊等候(佇列機制)。當然,地鐵站不可能只有一個閘機,擁有幾個閘機,就允許幾個人同時通過。
號誌也是這樣的,通過建構函式定義許可數量,使用時申請許可,處理完業務邏輯後釋放許可:
// 號誌中定義1個許可
Semaphore semaphore = new Semaphore(1);
// 申請許可
semaphore.acquire();
......
// 釋放許可
semaphore.release();
當我們為Semaphore定義一個許可時,它和互斥鎖相同,同一時間只允許一個執行緒進入臨界區。但是當我們定義了多個許可時,它與互斥鎖的差異就體現出來了:
Semaphore semaphore = new Semaphore(3);
for(int i = 1; i < 5; i++) {
int finalI = i;
new Thread(()-> {
try {
semaphore.acquire();
System.out.println("第[" + finalI + "]個執行緒獲取到semaphore");
TimeUnit.SECONDS.sleep(10);
semaphore.release();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();
}
執行這段程式碼可以看到,同一時間3個執行緒都進入了臨界區,只有第4個執行緒被擋在了臨界區外。
還記得在《AQS的今生,構建出JUC的基礎》中提到的同步狀態嗎?我們當時說它是某些同步器的計數器:
AQS中,state不僅用作表示同步狀態,也是某些同步器實現的計數器,如:Semaphore中允許通過的執行緒數量,ReentrantLock中可重入特性的實現,都依賴於
state
作為計數器的特性。
先來看Semaphore與AQS的關係:
與ReentrantLock一樣,Semaphore內部實現了繼承自AQS的同步器抽象類Sync
,並有FairSync
和NonfairSync
兩個實現類。接下來我們就通過剖析Semaphore的原始碼,來驗證我們之前的說法。
Semaphore提供了兩個構造方法:
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
可以看到Semaphore和ReentrantLock的設計思路是一致的,Semaphore內部也實現了兩個同步器FairSync
和NonfairSync
,分別實現公平模式和非公平模式,而Semaphore的構造本質上是構造同步器的實現。我們以非公平模式的NonfairSync
的實現為例:
public class Semaphore implements java.io.Serializable {
static final class NonfairSync extends Sync {
NonfairSync(int permits) {
super(permits);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
setState(permits);
}
}
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
protected final void setState(int newState) {
state = newState;
}
}
追根溯源,構造器的引數permits
最終還是迴歸到了AQS的state
身上,藉助了state
作為計數器的特性來實現Semaphore的功能。
現在我們已經為Semaphore設定了一定數量的許可(permits),接下來我們就需要通過Semaphore#acquire
方法獲取許可,進入Semaphore所「守護」的臨界區:
public class Semaphore implements java.io.Serializable {
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
if (tryAcquireShared(arg) < 0) {
doAcquireSharedInterruptibly(arg);
}
}
}
這兩步和ReentrantLock非常相似,先通過tryAcquireShared
嘗試直接獲取許可,失敗後通過doAcquireSharedInterruptibly
加入到等待佇列中。
Semaphore中直接獲取許可的邏輯非常簡單:
static final class NonfairSync extends Sync {
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 獲取可用許可數量
int available = getState();
// 計算許可數量
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining)) {
return remaining;
}
}
}
}
首先是獲取並減少可用許可的數量,當許可數量小於0時返回一個負數,或通過CAS更新許可數量成功後,返回一個正數。此時doAcquireSharedInterruptibly
會將當前的申請Semaphore許可的執行緒新增到AQS的等待佇列中。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
// 建立共用模式的等待節點
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 再次嘗試獲取許可,並返回剩餘許可數量
int r = tryAcquireShared(arg);
if (r >= 0) {
// 獲取成功,更新頭節點
setHeadAndPropagate(node, r);
p.next = null;
return;
}
}
// 獲取失敗進入等待狀態
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
throw new InterruptedException();
}
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
}
Semaphore的使用的doAcquireSharedInterruptibly
與ReentrantLock
使用的acquireQueued
方法核心邏輯一直,但是有細微的實現差別:
建立節點使用Node.SHARED
模式;
更新頭節點使用了setHeadAndPropagate
方法。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);
// 是否要喚醒等待中的節點
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared()) {
// 喚醒等待中的節點
doReleaseShared();
}
}
}
我們知道在ReentrantLock中執行acquireQueued
,當成功獲取鎖後,只需要執行setHead(node)
即可,那麼為什麼Semaphore還要再進行喚醒?
假設有3個許可的Semaphore同時有T1,T2,T3和T4總計4個執行緒競爭:
它們同時進入nonfairTryAcquireShared
方法,假設只有T1通過compareAndSetState(available, remaining)
成功修改有效的許可數量,T1進入臨界區;
T2,T3和T4進入doAcquireSharedInterruptibly
方法,通過addWaiter(Node.SHARED)
構建出AQS的等待佇列(參考AQS的今生中關於addWaiter
方法的分析);
假設T2成為了頭節點的直接後繼節點,T2再次執行tryAcquireShared
嘗試獲取許可,T3和T4執行parkAndCheckInterrupt
;
T2成功獲取許可並進入臨界區,此時Semaphore剩餘1個許可,而T3和T4處於暫停狀態中。
這種場景中,只有兩個許可產生了作用,顯然不符合我們對的初衷,因此在執行setHeadAndPropagate
更新頭節點時,判斷剩餘許可的數量,當數量大於0時繼續喚醒後繼節點。
Tips:
Semaphore在獲取許可的流程與ReentrantLock加鎖的過程高度相似~~
下文分析doReleaseShared
是如何喚醒等待中節點的。
Semaphore的release方法就非常簡單了:
public class Semaphore implements java.io.Serializable {
public void release() {
sync.releaseShared(1);
}
abstract static class Sync extends AbstractQueuedSynchronizer {
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
// 計算許可數量
int next = current + releases;
if (next < current) {
throw new Error("Maximum permit count exceeded");
}
// 通過CAS更新許可數量
if (compareAndSetState(current, next)) {
return true;
}
}
}
}
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
// 判斷AQS的等待佇列是否為空
if (h != null && h != tail) {
int ws = h.waitStatus;
// 判斷當前節點是否處於待喚醒的狀態
if (ws == Node.SIGNAL) {
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)){
continue;
}
unparkSuccessor(h);
} else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) {
// 狀態為0時,更新節點的狀態為無條件傳播
continue;
}
}
if (h == head) {
break;
}
}
}
}
我們可以看到Semaphore的release
方法分了兩部分:
tryReleaseShared
方法更新Semaphore的有效許可數量;
doReleaseShared
喚醒處於等待中的節點。
喚醒的邏輯並不複雜,依舊是對節點狀態waitStatus
的判斷,來確定是否需要執行unparkSuccessor
,當狀態為ws == 0
,會將節點的狀態更新為Node.PROPAGAT
,即無條件傳播。
Tips:與ReentrantLock所不同的是,Semaphore並不支援Node.CONDITION
狀態,同樣的ReentrantLock也不支援Node.PROPAGATE
狀態。
關於Semaphore的內容到這裡就結束了,今天我們只具體分析了非公平模式下核心方法的實現,至於公平模式的實現,以及其它方法的實現,就留個大家自行探索了。
好了,希望本文能夠帶給你一些幫助,我們下次再見!最後歡迎大家關注王有志的專欄《Java面試都問啥?》。