【一知半解】AQS

2022-07-13 18:00:46

什麼是AbstractQueuedSynchronizer(AQS)

字面意思是抽象佇列同步器,使用一個voliate修飾的int型別同步狀態,通過一個FIFO佇列完成資源獲取的排隊工作,把每個參與資源競爭的執行緒封裝成一個Node節點來實現鎖的分配。

AbstractQueuedSynchronizer原始碼

public abstract class AbstractQueuedSynchronizer 
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    private transient volatile Node head;//連結串列頭
    private transient volatile Node tail;//連結串列尾
    private transient Thread exclusiveOwnerThread;//持有鎖的執行緒
    private volatile int state;//同步狀態,0表示當前沒有執行緒獲取到鎖
    static final class Node {//連結串列的Node節點類
      volatile int waitStatus;//當前節點在佇列中的狀態
      volatile Node prev;//前置節點
      volatile Node next;//後置節點
      volatile Thread thread;//當前執行緒
    }
}

AQS同步佇列的基本結構

Node.waitStatus的說明

狀態值 描述
0 節點預設的初始值
SIGNAL=-1 執行緒已經準備好,等待釋放資源
CANCELLED=1 獲取鎖的請求執行緒被取消
CONDITION=-2 節點在佇列中,等待喚醒

state為什麼要用volatile修飾?

  1. 可見性,一個執行緒對變數的修改可以立即被別的執行緒感知到
  2. 有序性,禁止指令重排

AQS獲取鎖步驟

  public final void acquire(int arg) {
      if (!tryAcquire(arg) &&
          acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
          selfInterrupt();
  }
  1. 當一個執行緒獲取鎖時,首先判斷state狀態值是否為0
  2. 如果state==0,則通過CAS的方式修改為非0狀態
  3. 修改成功,則表明獲取鎖成功,執行業務程式碼
  4. 修改失敗,則把當前執行緒封裝為一個Node節點,加入到佇列中並掛起當前執行緒
  5. 如果state!=0,則把當前執行緒封裝為一個Node節點,加入到佇列中並掛起當前執行緒

AQS獲取鎖過程

首先呼叫tryAcquire去修state的狀態值,成功就獲取當前鎖;失敗則加入當前等待佇列中,然後掛起執行緒。

tryAcquire

AQS的原始碼中tryAcquire是一空實現,需要它的子類去實現這個空方法。因為在AQS中雖然公平鎖非公平鎖的都是基於一個CLH去實現,但是在獲取鎖的過程中略有不同。

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

公平鎖FairSync#tryAcquire

protected final boolean tryAcquire(int acquires) {
            //獲取當前執行緒
            final Thread current = Thread.currentThread();
            int c = getState();//獲取同步器的狀態
            if (c == 0) {//當前沒有執行緒獲取到鎖
                //首先判斷祖宗節點的執行緒是否當前執行緒一樣
                if (!hasQueuedPredecessors() &&
                    //更改state的狀態值為非0
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //如果鎖持有者的執行緒是當前執行緒,則可放行,鎖的重入
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
/**
* 判斷祖宗節點的執行緒是否當前執行緒一樣
* 傀儡節點的下個節點
*/
public final boolean hasQueuedPredecessors() {
    Node t = tail;
    Node h = head;
    Node s;
    //頭節點的下個節點所持有的執行緒是否與當前執行緒相同
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

非公平鎖NonfairSync#tryAcquire

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        //通過CAS更改state的狀態值
        if (compareAndSetState(0, acquires)) {
            //把當前執行緒設定為鎖的持有者
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //如果鎖持有者的執行緒是當前執行緒,則可放行,鎖的重入
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
} 


對比後發現,公平鎖先判斷是否有老祖宗節點,如果有則返回false;如果當前執行緒對應的node就是老祖宗節點,則直接去修改state狀態,把state改為非0。

addWaiter

獲取鎖成功的執行緒去執行業務邏輯了,獲取鎖失敗的執行緒則會在佇列中排隊等候,每個等候的執行緒也都不安分的。

private Node addWaiter(Node mode) {
    //把當前執行緒封裝為一個Node節點
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //加入到佇列的尾部
    enq(node);
    return node;
}
  1. 把當前執行緒封裝為一個Node節點
  2. 當第一次執行這個方法時,由於head和tail都還沒有賦值,則pred指向的tail也是空,所以直接直到enq(node)
  3. 當pred指向的tail不為空時,則通過CAS的方式加入到尾部,如果成功直接返回;如果失敗,則進入enq(node)通過自旋的方式加入。
//通過自旋的方式將節點加入到節點的尾部
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

為了操作連結串列的方便,一般都要在連結串列的頭前加入一個傀儡節點,AQS的連結串列也不例外。
先建立一個傀儡節點,並把head、tail均指向它,然後再把node節點加入到尾部後面,移動tail的指向。

acquireQueued

當節點成功加入到連結串列的尾部後,等待被喚醒,然後通過自旋的方式去獲取鎖

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //當前節點的前置節點
            final Node p = node.predecessor();
            //如果前置節點是傀儡節點(head指向傀儡節點),則再次嘗試去獲取鎖
            if (p == head && tryAcquire(arg)) {
                //獲取成功後,則移除之前的傀儡節點,head指向當前node,
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //獲取鎖失敗後,設定node節點的狀態,並掛起當前節點
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
  1. 獲取node節點的前置節點,如果前置節點是head,則再次嘗試去獲取鎖
  2. 設定當前node節點的前置節點狀態為-1(表示後續節點正在等待狀態,預設是0),然後通過自旋的後會進行到parkAndCheckInterrupt掛起當前節點
  3. LockSupport.park(this)執行完事,當前執行緒會一直阻塞到這個地方
  4. 當前喚醒時再次從1開始執行

AQS釋放鎖過程

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

主要是恢復state的值、重置鎖持有都執行緒,然後喚醒掛起的執行緒。

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    //當前執行緒與鎖持有者執行緒不一樣會報錯
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {//重入的次數為0時,則當前執行緒已經沒有重入了,可以清空鎖的持有者
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

恢復state狀態的值,如果重入次數為0時,則清空鎖的持有都為null

private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)//喚醒下個node對應的執行緒
            LockSupport.unpark(s.thread);
    }

設定head指向的node節點的watiStatus的狀態值,然後找到下個節點對應的執行緒並喚醒。