ReentrantLock、AQS 原始碼分析

2020-09-25 11:00:37

如果 把ReentrantLock比做一個人的話,那麼 AQS 就是他的靈魂。離開 AQS 談論鎖都是耍流氓

一.AQS使用方式和其中的設計模式

繼承,模板方法設計模式

二.重要引數

  1. private volatile int state; 記錄當前鎖是否有執行緒拿到鎖、一個執行緒進入鎖的重入數。如果是0,代表沒有任何執行緒進入鎖。如果是 n,n>0 那麼代表有個執行緒重入了 n 次
  2. AbstractOwnableSynchronizer: private transient Thread exclusiveOwnerThread:當前拿鎖的執行緒
  3. volatile int waitStatus:競爭失敗的執行緒會打包成Node放到同步佇列,Node可能的狀態裡:
  • CANCELLED(cancelled 1):該節點的執行緒可能由於超時或被中斷而處於被取消(作廢)狀態,一旦處於這個狀態,節點狀態將一直處於CANCELLED(作廢),因此應該從佇列中移除.
  • SIGNAL(signal -1):拿到鎖的節點(head)和未拿到鎖正常等待的節點 waitState 都是 signal 。當前節點為SIGNAL時,後繼節點會被掛起,因此在當前節點釋放鎖或被取消之後必須被喚醒(unparking)其後繼結點. 如:如果head 釋放鎖以後,就會判斷 head.next 是否是 signal,是的話喚醒。
  • CONDITION(condition -2) :當前節點處於等待佇列
  • PROPAGATE(propagate -3):共用,表示狀態要往後面的節點傳播
    0,表示初始狀態(拿完鎖的狀態)
  1. private transient volatile Node head:等待佇列的頭
  2. private transient volatile Node tail:等待佇列的尾

三.瞭解其中的方法

1.模板方法:

   獨佔式獲取

   accquire
   acquireInterruptibly
   tryAcquireNanos

   共用式獲取

   acquireShared
   acquireSharedInterruptibly
   tryAcquireSharedNanos

   獨佔式釋放鎖

   release

   共用式釋放鎖

   releaseShared

2.需要子類覆蓋的流程方法

   獨佔式獲取 tryAcquire
   獨佔式釋放 tryRelease
   共用式獲取 tryAcquireShared
   共用式釋放 tryReleaseShared
   這個同步器是否處於獨佔模式 isHeldExclusively

3.同步狀態state:

getState:獲取當前的同步狀態
setState:設定當前同步狀態
compareAndSetState 使用CAS設定狀態,保證狀態設定的原子性

三、原始碼

1.lock

實現類原始碼:ReentrantLock 為例

final void lock() {
            acquire(1);
        }

AbstractQueuedSynchronizer 原始碼

//p1197  
 public final void acquire(int arg) {
 //tryAcquire 為需要覆蓋,需要子類實現的方法
 //EXCLUSIVE表示正在一個獨佔模式下等待
        //如果一個執行緒執行 tryAcquire 恰好拿到了鎖,那麼就不再執行acquireQueued了。也不會放到待執行的佇列了
        if (!tryAcquire(arg) &&
        //addWaiter 下面 p605
        //acquireQueued 下面857
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}

//子類實現tryAcquire 方法範例
protected boolean tryAcquire(int arg) {
			if(compareAndSetState(0,1)) {
				setExclusiveOwnerThread(Thread.currentThread());
				return true;
			}
			return false;
		}

如下為 addWaiter 方法 這一支的操作。

把呼叫 lock 的執行緒包裝成 node 節點放到佇列中,並把這個 node 節點返回
判斷當前佇列有沒有尾節點(佇列是否為空):

  1. 如果不為空那麼就把當前節點掛在佇列末尾(進行雙連結串列的一些操作),設定當前節點為尾節點
  2. 如果為空,呼叫 enq 的方法,構建一個『空節點node ⇆ 當前執行緒 node』這樣的雙連結串列,enq 中使用自旋 CAS 來新增,防止有其他執行緒更改
 //p605
 //mode == null
private Node addWaiter(Node mode) {
// 使用當前執行緒 構造一個 node,構造器如下  p505
        Node node = new Node(Thread.currentThread(), mode); 
//如果當前有尾節點(等待佇列不為空,非初始化)
        Node pred = tail;
        if (pred != null) {
//    當前節點的前驅節點地址賦值
            node.prev = pred;
//   使用 CAS 把當新增進來的節點設定成尾節點
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
// 如果當前沒有尾節點(等待佇列為空,初始化佇列的時候) p583
        enq(node);
        return node;
    }
    
//p505    
    Node(Thread thread, Node mode) {    
            this.nextWaiter = mode;
            this.thread = thread;
        }

如果尾節點為空(那麼head也是空的,整個佇列都是空的),才會執行enq。
所以此方法最後會形成一個『空節點node ⇆ 當前執行緒 node』,這樣一個雙連結串列

//p583
private Node enq(final Node node) {
//自旋
        for (;;) {
            Node t = tail;
            //佇列初始化
            if (t == null) { 
                //cas 方式設定 空節點 到 head 中,且 tail 與 head 同步
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
               //當前節點 的 上一個節點指標指向 if 中 造出來的那個  空節點
                node.prev = t;
                //cas 設定當前節點為尾節點
                if (compareAndSetTail(t, node)) {
                    //if 中 造出來的那個空節點的下一個節點指向當前節點
                    t.next = node;
                    return t;
                }
            }
        }
    }

如下為acquireQueued 的操作

head 節點為已經拿到鎖的執行緒
此方法基本思路
1.先拿到當前節點的上一個節點,如果上一個節點為head且當前節點嘗試拿了一下鎖拿到了。那麼就把當前節點設定成head,上一個節點脫鉤,返回 false
2.否則就會執行shouldParkAfterFailedAcquire、parkAndCheckInterrupt
3.感覺如果p == head && tryAcquire(arg)條件不滿足迴圈將永遠無法結束,當然不會出現死迴圈,奧祕在於parkAndCheckInterrupt會把當前執行緒掛起,從而阻塞住執行緒的呼叫棧(阻塞住的意思就是走到這個方法parkAndCheckInterrupt,就不動了,卡住了)。

//p857
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //如果他的前驅節點是頭結點,且已經執行完畢(state 已經復位),執行tryAcquire後被我當前執行緒搶佔了鎖,那麼執行把當前節點設定為head的操作
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
        //下面的兩個方法主要是檢查執行緒的狀態,是否被中斷。且阻塞當前執行緒(parkAndCheck) 
                //shouldParkAfterFailedAcquire p795
                //parkAndCheckInterrupt p835
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                //如果發生異常,會執行此段程式碼,取消加入佇列
                cancelAcquire(node);
        }
    }

#p795

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            
          //This node has already set status asking a release to signal it, so it can safely park.
         //規則1:該方法首先檢查前趨結點的waitStatus位,如果為SIGNAL,表示前趨結點會通知它,那麼它可以放心大膽地掛起了,返回 true 後呼叫acquireQueued方法的 parkAndCheckInterrupt)將導致執行緒阻塞    
            return true;
        if (ws > 0) {
           //Predecessor was cancelled. Skip over predecessors and  indicate retry.
           //規則2:如果前趨結點是一個被取消的結點怎麼辦呢?那麼就向前遍歷跳過被取消的結點,直到找到一個沒有被取消的結點為止,將找到的這個結點作為它的前趨結點,將找到的這個結點的waitStatus位設定為SIGNAL,返回false表示執行緒不應該被掛起。然後進去acquireQueued方法迴圈   
           //接下來acquireQueued迴圈會出現兩種結果  
           //1.因為在這一步幹掉了前一個被取消了的節點,把前前一個節點變成了前一個節點,恰巧前一個節點還是頭結點,所以當前節點嘗試tryAcquire可能會獲取到鎖。
           //2.前前一個節點不是頭結點,那麼就會再次進入到該方法。此次前一個節點肯定是SIGNAL狀態,所以當前節點被毫無疑問的掛起(阻塞)
            
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
             //如果前繼節點狀態為非SIGNAL、非CANCELLED,則設定前繼的狀態為SIGNAL,返回false後進入acquireQueued的無限迴圈
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
p835
//這個方法的主要任務就是阻斷執行緒
private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

總結

  • 子類實現 tryAcquire方法來具體操作拿鎖的操作。
  • 使用者呼叫acquire 方法,acquire 首先呼叫tryAcquire看看是否能拿到鎖。如果拿到了那麼返回 true,程式也能順利往下執行,同時業務程式碼也會被執行
  • 如果拿不到,那麼就把當前執行緒封裝成一個 Node,通過呼叫addWaiter放到一個雙連結串列當中。
  • addWaiter還會把這個node 返回,在acquireQueued方法會進行自旋,如果當前節點的前一個節點為頭結點,本節點會再一次呼叫tryAcquire嘗試進行拿鎖的操作。
  • 如果拿不到接著執行,在shouldParkAfterFailedAcquire 中把前一個節點的 waitStatus 由預設的 0 改成 SIGNAL(-1),當前節點作為尾節點waitStatus狀態還是 0。
  • 改完waitStatus狀態之後,執行parkAndCheckInterrupt 阻塞當前執行緒。注:如果shouldParkAfterFailedAcquire返回false,&& 會短路,parkAndCheckInterrupt 阻塞當前執行緒。注:如果shouldParkAfterFailedAcquire就不執行了。但是acquireQueued是一個自旋,待執行shouldParkAfterFailedAcquire,waitStatus狀態都改成-1後,還是會執行 parkAndCheckInterrupt來阻塞當前執行緒

2.unlock

public final boolean release(int arg) {
        //tryRelease aqs 的具體實現類重寫
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    
private void unparkSuccessor(Node node) {
    
    //設定 waitstatus 為0
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    
    Node s = node.next;
    //如果下個節點是null或者下個節點被cancelled,就找到佇列最開始的非cancelled的節點
    if (s == null || s.waitStatus > 0) {
        s = null;
        //為什麼要從後往前找??原因在addWaiter方法:
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null){
        //釋放 一個被阻塞的執行緒
        LockSupport.unpark(s.thread);
        }
}   

//我們從這裡可以看到,節點入隊並不是原子操作,也就是說,node.prev = pred; compareAndSetTail(pred, node) 這兩個地方可以看作Tail入隊的原子操作
//但是此時pred.next = node;還沒執行,如果這個時候執行了unparkSuccessor方法,就沒辦法從前往後找了,所以需要從後往前找。
//還有一點原因,在產生CANCELLED狀態節點的時候,先斷開的是Next指標,Prev指標並未斷開,因此也是必須要從後往前遍歷才能夠遍歷完全部的Node。
private Node addWaiter(Node mode) {
	Node node = new Node(Thread.currentThread(), mode);
	Node pred = tail;
	if (pred != null) {
		node.prev = pred;
		if (compareAndSetTail(pred, node)) {
			pred.next = node;
			return node;
		}
	}
	enq(node);
	return node;
}

呼叫 LockSupport.unpark(s.thread); 釋放當前執行緒阻塞,程式順序往下執行業務程式碼。

四、ReentrantLock原始碼分析

1.構造器

引數為空或 false 是非公平鎖。為 true 是公平鎖。

    public ReentrantLock() {
        sync = new NonfairSync();
    }

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

2. 非公平鎖獲取鎖

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    
    final void lock() {
        //如果直接修改了 state ,那麼就證明現在沒有其他資源在獲得鎖,本執行緒直接拿到鎖。(原來鎖就是操作了 一個 volitale 的 state)
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    //實現 aqs 的 tryAcquire
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}


final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    //如果是0,證明還沒有執行緒拿到這個鎖,直接 cas 設定 state
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            //記錄當前拿到鎖的執行緒是哪個
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //鎖重入的時候
    //如果當前拿鎖的執行緒是 已經拿到鎖的執行緒,那麼增加重入次數
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    //如果當前鎖已經被獲取,且拿鎖的執行緒不是當前執行緒,那麼返回 false,tryAcquire 失敗
    return false;
}

3.公平鎖獲取鎖

    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            //如果目前沒有執行緒拿鎖 
            if (c == 0) {
                //hasQueuedPredecessors 是 公平鎖的關鍵
                //對下面的 if 換一種好理解的寫法
                /**
                    if(!hasQueuedPredecessors()){
                        if(compareAndSetState(0, acquires)){
                            setExclusiveOwnerThread(current);
                            return true;
                        }
                    }
                */
                //如果 hasQueuedPredecessors 返回 false,那麼證明等待佇列中沒有執行緒在排隊。 那麼執行 compareAndSetState 拿鎖
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

//hasQueuedPredecessors是公平鎖加鎖時判斷等待佇列中是否存在有效節點的方法。
//如果返回False,說明當前執行緒可以爭取共用資源;如果返回True,說明佇列中存在有效節點,當前執行緒必須加入到等待佇列中。
//判斷佇列中是否有 有效節點,以真實執行緒節點 入隊為準。不以虛節點(new Node())為準
public final boolean hasQueuedPredecessors() {

Node t = tail; 
Node h = head;
//head 的下一個元素
Node s;

//1.如果 h != t 為false (h == t 為 true) ,那麼直接返回 false,在tryAcquire 中執行 set state 操作
//2.(s = h.next) == null && h != t  是走到了 enq ①處,此時 tail == node,head == new Node()。
//  head 的 next 還未指向 tail,但是 node 已入  tail。所以公平起見當前執行緒就不能再搶佔鎖了。   返回 true,使當前執行緒不會再設定 state  
//3.如果第二次拿鎖的執行緒 和第一次拿執行緒的執行緒不是同一個執行緒,那麼第二個執行緒應該讓渡第一個執行緒(如果是同一個執行緒,那麼兩次拿鎖就不分先後)
return h != t &&
    ((s = h.next) == null || s.thread != Thread.currentThread());
}

//分析此段程式碼需從 enq 方法看起
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                //①
                t.next = node;
                return t;
            }
        }
    }
}

總結:

  1. NonfairSync 和 fairSync 的根本區別在於tryAcquire 中 當 state == 0 時(即當前鎖沒有被任何一個執行緒佔有),當前執行緒是直接嘗試拿鎖(非公平)還是檢視等待佇列是否有元素來決定去排隊還是嘗試拿鎖(當等待佇列有元素就排隊,沒有元素就拿鎖)。
  2. 公平鎖直接的提現 方法是 hasQueuedPredecessors
  3. tryAcquire 有兩個地方呼叫。1)lock 的時候直接呼叫一次。2)加入佇列後再次嘗試呼叫。
final void lock() {
            acquire(1);
        }

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}    

4.釋放鎖

protected final boolean tryRelease(int releases) {
    //可重入鎖 state -1 
    int c = getState() - releases;
    //如果解鎖執行緒 和 持鎖執行緒不是同一個  拋異常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    
    //如果當前執行緒的鎖全部解完,那麼就代表 此執行緒已經釋放了鎖
    if (c == 0) {
        //釋放鎖成功
        free = true;
        //持鎖執行緒 釋放
        setExclusiveOwnerThread(null);
    }
    //設定 state 數量
    setState(c);
    return free;
}