15.AQS的今生,構建出JUC的基礎

2023-03-29 12:01:26

大家好,我是王有志,歡迎和我聊技術,聊漂泊在外的生活。快來加入我們的Java提桶跑路群:共同富裕的Java人

AQS的前世,從1990年的論文說起》中我們已經對AQS做了簡單的介紹,並學習了先於AQS出現的3種基於排隊思想的自旋鎖。今天我們深入到AQS的設計中,探究Doug Lea是如何構建JUC框架基礎元件的。不過在正式開始前,我們先來回顧上一篇中提到的面試題:

  • 原理相關:AQS是什麼?它是怎樣實現的?

  • 設計相關:如何使用AQS實現Mutex?

希望今天可以幫你解答上面的問題。

Tips

初衷與目的

《The java.util.concurrent Synchronizer Framework》中清晰的闡述了Doug Lea設計AQS的目的:

This framework provides common mechanics for atomically managing synchronization state, blocking and unblocking threads, and queuing.

(AQS)框架為同步狀態的原子性管理,執行緒的阻塞和喚醒以及排隊提供了一種通用的機制。也就是說,可以通過AQS去構建不同的同步器,如:基於AQS而誕生的ReentrantLock

基於構建通用同步機制的目的,Doug Lea分析了各種同步器,總結出它們共同的特性:

  • acquire操作:阻塞呼叫執行緒,直到同步狀態允許其繼續執行;

  • release操作:改變同步狀態,喚醒被阻塞的執行緒。

除此之外,論文中也提到了對AQS的效能要求,Doug Lea認為大家在分析synchronized時提到的2個問題:

  • 如何最小化空間開銷(因為任意Java物件都可以作為鎖)

  • 如何最小化單核處理器的單執行緒環境下的時間開銷

都不是AQS要考慮的,他認為AQS需要考慮的是scalability(可伸縮性),即大部分場景中,即便存在競爭,也能提供穩定的效率。原文中是這樣描述的:

Among the main goals is to minimize the total amount of time during which some thread is permitted to pass a synchronization point but has not done so.

(AQS)主要目標之一是使某一執行緒被允許通過同步點但還沒有通過的情況下耗費的總時間最少,即從一個執行緒釋放鎖開始,到另一個執行緒獲取鎖,這個過程鎖消耗的時間。

設計與實現

Doug Lea先是完成了acquire操作和release操作的虛擬碼設計:

// acquire操作
while (synchronization state does not allow acquire) {
    enqueue current thread if not already queued;
    possibly block current thread;
}
dequeue current thread if it was queued;

// release操作
update synchronization state;
if (state may permit a blocked thread to acquire)
    unblock one or more queued threads;

為了實現上述的操作,需要以下元件的協同工作:原子管理的同步狀態,執行緒的阻塞與喚醒,以及佇列

同步狀態

AQS使用volatile修飾的int型別變數state儲存同步狀態,並提供getStatesetStatecompareAndSetState方法。

AQS中,state不僅用作表示同步狀態,也是某些同步器實現的計數器,如:Semaphore中允許通過的執行緒數量,ReentrantLock中可重入特性的實現,都依賴於state作為計數器的特性。

早期,Java對long型別變數的原子操作需要藉助內建鎖來完成,效能較差,並且除了CyclicBarrier外,其餘同步器使用32位元的int型別已經能夠滿足需求,因此在AQS誕生初期,state可以使用int型別。

Tips

  • CyclicBarrier通過鎖來實現;

  • Java 1.6中提供了使用long型別的AbstractQueuedLongSynchronizer

  • 注意要區別同步狀態與執行緒在佇列中的狀態。

阻塞與喚醒

早期,執行緒的阻塞與喚醒只能通過Thread.suspendThread.resume實現,但存在競態問題,即一個執行緒先呼叫了Thread.resume後呼叫Thread.suspend,那麼Thread.resume不會產生任何作用。

AQS使用LockSupport.parkLockSupport.unpark實現阻塞與喚醒,特點是如果LockSupport.unpark發生在LockSupport.park前,則此次的LockSupport.park無效。

Tips:無論提前呼叫多少次LockSupport.unpark,都只會使後一次LockSupport.park無效。

CLH佇列

佇列的設計是構建AQS的關鍵,Doug Lea在論文中使用「The heart of」來形容:

The heart of the framework is maintenance of queues of blocked threads, which are restricted here to FIFO queues.

Doug Lea參考了CLH的設計, 保留了基本的設計,由前驅節點做阻塞與喚醒的控制,但是在佇列的選擇上做出了改變,AQS選擇雙向連結串列來實現佇列,節點中新增了prevnext指標。

新增prev指標主要是為了實現取消功能,而next指標的加入可以方便的實現喚醒後繼節點。

AQS原始碼分析

再次強調,本文基於Java 11完成,與Java 8的原始碼存在差異,如,操作同步狀態state時,Java 8藉助了UnSafe,而Java 11中使用了VarHandle。另外,本文只討論AQS的獨佔(EXCLUSIVE)模式因此會跳過共用(SHARED)模式

佇列的結構

有了《AQS的前世,從1990年的論文說起》的鋪墊,再結合Doug Lea論文中的描述,我們可以很容易想象到AQS中佇列節點的結構:執行緒狀態,前驅節點指標,後繼節點指標以及用於儲存執行緒的變數。事實也和我們的猜想十分接近:

static final class Node {

  volatile int waitStatus;
  
  volatile Node prev;
  
  volatile Node next;
  
  volatile Thread thread;
  
  Node nextWaiter;
}

注意,NodewaitStatus表示執行緒在佇列中的狀態,AQS的state表示同步器的狀態。Node中定義了waitStatus的5種狀態:

  • CANCELLED:1,執行緒獲取鎖的請求已經取消;

  • SIGNAL :-1,節點釋放後,需要喚醒後繼節點

  • CONDITION:-2,節點處於條件佇列中;

  • PROPAGATE:-3,共用(SHARED)模式下使用;

  • 0,初始化Node時的預設值。

AQS的實現中,並不是後繼節點「監聽」前驅節點的狀態,來決定自身是否持有鎖,而是通過前驅節點釋放鎖,並主動喚醒後繼節點來實現排隊的

AQS的結構

AQS的結構就更加簡單了:

private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;

static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L;

總共4個成員變數,除了我們意料之中的,佇列的頭尾節點和AQS的同步狀態外,還有SPIN_FOR_TIMEOUT_THRESHOLD。看名字會有些誤解,以為是自旋的閾值,實際上並不是,AQS提供了帶有超時時間的方法,例如doAcquireNanos方法:

private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
  final long deadline = System.nanoTime() + nanosTimeout;
  final Node node = addWaiter(Node.EXCLUSIVE);
  for (;;) {
    final Node p = node.predecessor();
    nanosTimeout = deadline - System.nanoTime();
    if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD) {
      LockSupport.parkNanos(this, nanosTimeout);
    }
}

可以看到只有在剩餘的nanosTimeout大於SPIN_FOR_TIMEOUT_THRESHOLD時,才會呼叫LockSupport.parkNanos(this, nanosTimeout)

Tips

  • Java 11中,無論是AQS還是Node中都使用了VarHandle,定義了大量的成員變數,我們跳過這部分;

  • 刪除了doAcquireNanos方法中大部分內容,重點展示nanosTimeoutSPIN_FOR_TIMEOUT_THRESHOLD的關係。

獲取鎖

如果是你,你會如何設計AQS的加鎖過程?

我可能會「按部就班」的構建佇列,並將等待執行緒逐個的加入的佇列中。那Doug Lea是如何設計AQS加鎖過程的呢?

public final void acquire(int arg) {
  if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
    selfInterrupt();
}

acquire方法中,Doug Lea設計了4步操作,如果僅從名字來看,首先tryAcquire嘗試獲取鎖,如果獲取失敗,則通過addWaiter加入等待,然後呼叫acquireQueued方法進入排隊狀態,最後是通過呼叫selfInterrupt方法使當前執行緒中斷。先來看AQS中的tryAcquire方法。

protected boolean tryAcquire(int arg) {
  throw new UnsupportedOperationException();
}

AQS中並未給出任何實現,它要求子類必須重寫tryAcquire方法,否則丟擲異常。

addWaiter方法

接著是addWaiter方法:

private Node addWaiter(Node mode) {
  // 註釋1:建立節點,通過acquire進入時mode = Node.EXCLUSIVE
  Node node = new Node(mode);
  for (;;) {
    // 註釋2:獲取尾節點
    Node oldTail = tail;
    if (oldTail != null) {
      // 註釋5:新增新的尾節點
      node.setPrevRelaxed(oldTail);
      if (compareAndSetTail(oldTail, node)) {
        oldTail.next = node;
        return node;
      }
    } else {
      // 註釋3:尾節點為空則初始化佇列
      initializeSyncQueue();
    }
  }
}

static final class Node {
  Node(Node nextWaiter) {
    this.nextWaiter = nextWaiter;
    // 可以看做是:this.thread = Thread.currentThread()
    THREAD.set(this, Thread.currentThread());
  }
}

private final void initializeSyncQueue() {
  Node h;
  // 註釋4:建立空節點,作為尾節點
  if (HEAD.compareAndSet(this, null, (h = new Node())))
    tail = h;
}

addWaiter的邏輯並不複雜:

  • 註釋1:建立節點node

  • 註釋2:獲取AQS的尾節點oldTail,並判斷是否存在尾節點;

  • 註釋3:初始化佇列;

  • 註釋4:建立空節點h,作為AQS的頭尾節點;

  • 註釋5:更新AQS的尾節點為node,並與oldTail關聯。

我們知道,只有tryAcquire失敗後,才會呼叫addWaiter方法,也就是說,如果實現了tryAcquire獲取鎖的邏輯,那麼在沒有競爭的場景下,AQS就不會構建等待佇列

另外我注意到在初始化佇列時,Doug Lea為等待佇列新增了一個空的頭節點,我的理解是,這裡使用了處理連結串列的常用技巧:虛擬頭節點

回過頭來看addWaiter做了什麼?它的核心功能是初始化的等待佇列,並返回當前佇列的尾節點

acquireQueued方法

addWaiter建立了等待佇列並返回尾節點後,就進入了acquireQueued方法中:

final boolean acquireQueued(final Node node, int arg) {
  // 是否中斷的標記
  boolean interrupted = false;
  try {
    for (;;) {
      // 註釋1:獲取node的前驅節點,node更名為currentNode更方便我理解
      final Node p = node.predecessor();
      if (p == head && tryAcquire(arg)) {
        setHead(node);
        p.next = null;
        return interrupted;
      }
      // 註釋2:判斷是否需要park當前執行緒
      if (shouldParkAfterFailedAcquire(p, node))
        interrupted |= parkAndCheckInterrupt();
    }
  } catch (Throwable t) {
    cancelAcquire(node);
    if (interrupted)
      selfInterrupt();
    throw t;
  }
}

註釋1的部分,獲取到node的前驅節點p,如果p為頭節點,則當前執行緒直接通過tryAcquire嘗試獲取鎖。如果p不是頭節點的話可以直接呼叫tryAcquire嗎?

答案是不可以,如果p不是頭節點,也就證明當前執行緒不在獲取鎖的第二順位上,前面可能還有若干節點在等待鎖,如果任意節點都直接呼叫tryAcquire,那就失去了acquireQueued方法的意義。

註釋2的部分,p不是頭節點的情況,也就是當前執行緒非第二順位獲取鎖。那麼node就要根據前驅節點的狀態來判斷是否中斷執行了:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  // 獲取前驅節點的狀態,waitStatus初始化的狀態為0
  int ws = pred.waitStatus;
  if (ws == Node.SIGNAL)
    // 註釋2:前驅節點處於Node.SIGNAL狀態
    return true;
  if (ws > 0) {
    do {
      node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
  } else {
    // 註釋1:更新前驅節點的狀態為Node.SIGNAL
    pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
  }
  return false;
}

private final boolean parkAndCheckInterrupt() {
  // 暫停執行緒
  LockSupport.park(this);
  return Thread.interrupted();
}

addWaiter的流程中可以看到,處理node的過程中並沒有處理node.waitStatus,此時waitStatus == 0,那麼對於node的前驅節點pred也是一樣的,因次第一次執行shouldParkAfterFailedAcquire方法時,會進入註釋1的部分,並返回false

再次進入acquireQueued的迴圈後,shouldParkAfterFailedAcquire返回true,執行parkAndCheckInterrupt方法,需要注意LockSupport.park(this)會讓執行緒暫停在此處,也就是說如果沒有執行緒喚醒,執行緒會一直停留在此處

至此,AQS的加鎖過程已經結束了,我們畫張圖來回顧下這個過程:

釋放鎖

接著來看解鎖的過程:

public final boolean release(int arg) {
  if (tryRelease(arg)) {
    Node h = head;
    if (h != null && h.waitStatus != 0)
      unparkSuccessor(h);
    return true;
  }
  return false;
}

按照AQS的風格tryRelease必然是要交給子類實現的:

protected boolean tryRelease(int arg) {
  throw new UnsupportedOperationException();
}

果不其然。

假設tryRelease執行成功,接下來會發生什麼?

  • 獲取頭節點h;

  • 判斷頭節點的狀態h.waitStatus != 0

  • 執行unparkSuccessor

來看unparkSuccessor的程式碼:

private void unparkSuccessor(Node node) {
  // node是當前執行緒的前驅節點,也是head節點
  int ws = node.waitStatus;
  if (ws < 0)
    // 處理node的waitStatus
    node.compareAndSetWaitStatus(ws, 0);
  Node s = node.next;
  // 註釋2:從後向前遍歷待喚醒的節點
  if (s == null || s.waitStatus > 0) {
    s = null;
    for (Node p = tail; p != node && p != null; p = p.prev)
      if (p.waitStatus <= 0)
        s = p;
  }
  // 註釋1:喚醒後繼節點
  if (s != null)
    LockSupport.unpark(s.thread);
}

如果一切順利,那麼unparkSuccessor時會跳過註釋2的部分,直接執行註釋1的LockSupport.unpark

不過別忘了,待喚醒的執行緒此時還在acquireQueued方法中阻塞著,喚醒的執行緒會繼續執行acquireQueued中的內容,呼叫tryAcquire獲取鎖,並更新AQS的頭節點

我們設想一個場景:

addWaiter執行到compareAndSetTail(oldTail, node)時呼叫了unparkSuccessor,可能會出現一種情況:

即T1已經與HEAD建立了聯絡,但HEAD卻沒有與T1建立聯絡。因此註釋2中,判斷HEAD節點沒有後繼節點時,會通過TAIL節點,從後向前遍歷等待佇列,查詢待喚醒的執行緒。

好了,AQS的核心原始碼分析到這裡就結束了,至於條件佇列,共用模式等就留給大家自行探索了。

構建互斥鎖

學習完AQS的核心原理後,我們來實踐一下,藉助AQS來構建構建互斥鎖:

public class MutexLock {
  public void lock() {
    sync.acquire(1);
  }
  
  public void unlock() {
    sync.release(0);
  }
  
  private final Sync sync = new Sync();
  
  static class Sync extends AbstractQueuedSynchronizer {
    @Override
    protected boolean tryAcquire(int arg) {
      Thread currentThread = Thread.currentThread();
      if(compareAndSetState(0, arg)) {
        setExclusiveOwnerThread(currentThread);
        return true;
      }else {
        return false;
      }
    }
    
    @Override
    protected boolean tryRelease(int arg) {
      if(getState() != 1) {
        return false;
      }
      
      setState(arg);
      return true;
    }
  }
}

通過AQS實現只有基礎功能的互斥鎖還是非常簡單的,甚至在重寫tryAcquire方法時可以不設定獨佔執行緒(雖然在現在也沒起到作用),只是簡單的使用CAS替換掉AQS的state即可:

@Override
protected boolean tryAcquire(int arg) {
  return compareAndSetState(0, arg);
}

當然了,這只是一把「玩具鎖」,還存在許多問題,比如,非上鎖執行緒依舊可以解鎖。其次除了阻塞還排隊外,也不支援諸如可重入等高階特性。

結語

好了,關於AQS的部分就到這裡了。如果你有看過《AQS的前世,從1990年的論文說起》中基於排隊思想自旋鎖的演進過程,並理解了MCS鎖和CLH鎖的實現,那麼理解AQS對你來說是非常容易的,雖然它們看起來是不同的東西,但核心原理是相同的,只是在技術實現上有些許差別。

最後,希望通過AQS的前世和今生,能夠幫助你重新認識AQS,理解Doug Lea設計這樣一個同步器基礎元件的意義。


好了,今天就到這裡了,Bye~~