18.詳解AQS家族的成員:Semaphore

2023-06-08 12:01:36

關注:王有志,一個分享硬核Java技術的互金摸魚俠。
歡迎你加入Java人的提桶跑路群共同富裕的Java人

今天我們來聊一聊AQS家族中另一個重要成員Semaphore,我只收集到了一道關於Semaphore的面試題,問了問「是什麼」和「如何實現的」:

  • 什麼是Semaphore?它是如何實現的?

按照我們的慣例,依舊是按照「是什麼」,「怎麼用」和「如何實現的」這3步來分析Semaphore。另外,今天提供了題解

Semaphore的使用

Semaphore直譯過來是號誌,是電腦科學中非常Old School的處理同步與互斥的機制與互斥鎖不同的是它允許指定數量的執行緒或程序存取共用資源

Semaphore處理同步與互斥的機制和我們平時過地鐵站的閘機非常相似。刷卡開啟閘機(acquire操作),通過後(存取臨界區)閘機關閉(release操作),後面的人才能夠繼續刷卡,而在前一個人通過前,後面的人只能排隊等候(佇列機制)。當然,地鐵站不可能只有一個閘機,擁有幾個閘機,就允許幾個人同時通過。

號誌也是這樣的,通過建構函式定義許可數量,使用時申請許可,處理完業務邏輯後釋放許可:

// 號誌中定義1個許可
Semaphore semaphore = new Semaphore(1);

// 申請許可
semaphore.acquire();

......

// 釋放許可
semaphore.release();

當我們為Semaphore定義一個許可時,它和互斥鎖相同,同一時間只允許一個執行緒進入臨界區。但是當我們定義了多個許可時,它與互斥鎖的差異就體現出來了:

Semaphore semaphore = new Semaphore(3);
for(int i = 1; i < 5; i++) {
  int finalI = i;
  new Thread(()-> {
    try {
      semaphore.acquire();
      System.out.println("第[" + finalI + "]個執行緒獲取到semaphore");
      TimeUnit.SECONDS.sleep(10);
      semaphore.release();
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }).start();
}

執行這段程式碼可以看到,同一時間3個執行緒都進入了臨界區,只有第4個執行緒被擋在了臨界區外。

Semaphore的實現原理

還記得在《AQS的今生,構建出JUC的基礎》中提到的同步狀態嗎?我們當時說它是某些同步器的計數器:

AQS中,state不僅用作表示同步狀態,也是某些同步器實現的計數器,如:Semaphore中允許通過的執行緒數量,ReentrantLock中可重入特性的實現,都依賴於state作為計數器的特性。

先來看Semaphore與AQS的關係:

與ReentrantLock一樣,Semaphore內部實現了繼承自AQS的同步器抽象類Sync,並有FairSyncNonfairSync兩個實現類。接下來我們就通過剖析Semaphore的原始碼,來驗證我們之前的說法。

構造方法

Semaphore提供了兩個構造方法:

public Semaphore(int permits) {
  sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
  sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

可以看到Semaphore和ReentrantLock的設計思路是一致的,Semaphore內部也實現了兩個同步器FairSyncNonfairSync,分別實現公平模式和非公平模式,而Semaphore的構造本質上是構造同步器的實現。我們以非公平模式的NonfairSync的實現為例:

public class Semaphore implements java.io.Serializable {
  static final class NonfairSync extends Sync {
    NonfairSync(int permits) {
      super(permits);
    }
  }
  
  abstract static class Sync extends AbstractQueuedSynchronizer {
    Sync(int permits) {
      setState(permits);
    }
  }
}

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
  protected final void setState(int newState) {
    state = newState;
  }
}

追根溯源,構造器的引數permits最終還是迴歸到了AQS的state身上,藉助了state作為計數器的特性來實現Semaphore的功能。

acquire方法

現在我們已經為Semaphore設定了一定數量的許可(permits),接下來我們就需要通過Semaphore#acquire方法獲取許可,進入Semaphore所「守護」的臨界區:

public class Semaphore implements java.io.Serializable {
  public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
  }
}

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
  public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted()) {
      throw new InterruptedException();
    }
    if (tryAcquireShared(arg) < 0) {
      doAcquireSharedInterruptibly(arg);
    }
  }
}

這兩步和ReentrantLock非常相似,先通過tryAcquireShared嘗試直接獲取許可,失敗後通過doAcquireSharedInterruptibly加入到等待佇列中。

Semaphore中直接獲取許可的邏輯非常簡單:

static final class NonfairSync extends Sync {
  protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
  }
}

abstract static class Sync extends AbstractQueuedSynchronizer {
  final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
      // 獲取可用許可數量
      int available = getState();
      // 計算許可數量
      int remaining = available - acquires;
      if (remaining < 0 || compareAndSetState(available, remaining)) {
        return remaining;
      }
    }
  }
}

首先是獲取並減少可用許可的數量,當許可數量小於0時返回一個負數,或通過CAS更新許可數量成功後,返回一個正數。此時doAcquireSharedInterruptibly會將當前的申請Semaphore許可的執行緒新增到AQS的等待佇列中。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {    
 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
   // 建立共用模式的等待節點
   final Node node = addWaiter(Node.SHARED);
   try {
     for (;;) {
       final Node p = node.predecessor();
       if (p == head) {
         // 再次嘗試獲取許可,並返回剩餘許可數量
         int r = tryAcquireShared(arg);
         if (r >= 0) {
           // 獲取成功,更新頭節點
           setHeadAndPropagate(node, r);
           p.next = null;
           return;
         }
       }
       // 獲取失敗進入等待狀態
       if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
         throw new InterruptedException();
       }
     }
   } catch (Throwable t) {
     cancelAcquire(node);
     throw t;
   }
 }
}

Semaphore的使用的doAcquireSharedInterruptiblyReentrantLock使用的acquireQueued方法核心邏輯一直,但是有細微的實現差別:

  • 建立節點使用Node.SHARED模式;

  • 更新頭節點使用了setHeadAndPropagate方法。

private void setHeadAndPropagate(Node node, int propagate) {
  Node h = head;
  setHead(node);
  
  // 是否要喚醒等待中的節點
  if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
    Node s = node.next;
    if (s == null || s.isShared()) {
      // 喚醒等待中的節點
      doReleaseShared();
    }
  }
}

我們知道在ReentrantLock中執行acquireQueued,當成功獲取鎖後,只需要執行setHead(node)即可,那麼為什麼Semaphore還要再進行喚醒?

假設有3個許可的Semaphore同時有T1,T2,T3和T4總計4個執行緒競爭:

  • 它們同時進入nonfairTryAcquireShared方法,假設只有T1通過compareAndSetState(available, remaining)成功修改有效的許可數量,T1進入臨界區;

  • T2,T3和T4進入doAcquireSharedInterruptibly方法,通過addWaiter(Node.SHARED)構建出AQS的等待佇列(參考AQS的今生中關於addWaiter方法的分析);

  • 假設T2成為了頭節點的直接後繼節點,T2再次執行tryAcquireShared嘗試獲取許可,T3和T4執行parkAndCheckInterrupt

  • T2成功獲取許可並進入臨界區,此時Semaphore剩餘1個許可,而T3和T4處於暫停狀態中。

這種場景中,只有兩個許可產生了作用,顯然不符合我們對的初衷,因此在執行setHeadAndPropagate更新頭節點時,判斷剩餘許可的數量,當數量大於0時繼續喚醒後繼節點。

Tips

  • Semaphore在獲取許可的流程與ReentrantLock加鎖的過程高度相似~~

  • 下文分析doReleaseShared是如何喚醒等待中節點的。

release方法

Semaphore的release方法就非常簡單了:

public class Semaphore implements java.io.Serializable {
  public void release() {
    sync.releaseShared(1);
  }
  
  abstract static class Sync extends AbstractQueuedSynchronizer {
    protected final boolean tryReleaseShared(int releases) {
      for (;;) {
        int current = getState();
        // 計算許可數量
        int next = current + releases;
        if (next < current) {
          throw new Error("Maximum permit count exceeded");
        }
        // 通過CAS更新許可數量
        if (compareAndSetState(current, next)) {
            return true;
        }
      }
    }
  }
}

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
  public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
      doReleaseShared();
      return true;
    }
    return false;
  }
  
  private void doReleaseShared() {
    for (;;) {
      Node h = head;
      // 判斷AQS的等待佇列是否為空
      if (h != null && h != tail) {
        int ws = h.waitStatus;
        // 判斷當前節點是否處於待喚醒的狀態
        if (ws == Node.SIGNAL) {
          if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)){
            continue;
          }
          unparkSuccessor(h);
        } else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) {
          // 狀態為0時,更新節點的狀態為無條件傳播
          continue;
        }
      }
      if (h == head) {
        break;
      }
    }
  }
}

我們可以看到Semaphore的release方法分了兩部分:

  • tryReleaseShared方法更新Semaphore的有效許可數量;

  • doReleaseShared喚醒處於等待中的節點。

喚醒的邏輯並不複雜,依舊是對節點狀態waitStatus的判斷,來確定是否需要執行unparkSuccessor,當狀態為ws == 0,會將節點的狀態更新為Node.PROPAGAT,即無條件傳播。

Tips:與ReentrantLock所不同的是,Semaphore並不支援Node.CONDITION狀態,同樣的ReentrantLock也不支援Node.PROPAGATE狀態。

結語

關於Semaphore的內容到這裡就結束了,今天我們只具體分析了非公平模式下核心方法的實現,至於公平模式的實現,以及其它方法的實現,就留個大家自行探索了。

好了,希望本文能夠帶給你一些幫助,我們下次再見!最後歡迎大家關注王有志的專欄《Java面試都問啥?》。