JUC同步鎖原理原始碼解析二--ReentrantReadWriteLock

2023-06-16 06:00:48

JUC同步鎖原理原始碼解析二----ReentrantReadWriteLock

1.讀寫鎖的來源

​ 在開發場景下,對於寫操作我們為了保證原子性所以需要上鎖,但是對於讀操作,由於其不改變資料,只是單純對資料進行讀取,但是每次都上一把互斥鎖,阻塞所有請求。這個明顯不符合讀多寫少的場景。所以將鎖分為兩把:讀鎖和寫鎖。

2.讀寫鎖的底層實現

​ 讀寫鎖的底層實現依舊依賴於AQS。具體底層實現,請檢視上一篇文章的介紹

2.AQS原始碼

Node節點

 static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
 
        static final int PROPAGATE = -3;

        volatile int waitStatus;

        volatile Node prev;

        volatile Node next;
       
        volatile Thread thread;

        Node nextWaiter;
}

AbstractQueuedSynchronizer類

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
 	private transient volatile Node head;

    /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;

    /**
     * The synchronization state.
     */
    private volatile int state;//最重要的一個變數
       
}

ConditionObject類

public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;
}

accquire方法

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&//嘗試獲取鎖
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//如果獲取鎖失敗,新增到佇列中,由於ReentrantLock是獨佔鎖所以節點必須是EXCLUSIVE型別
        selfInterrupt();//新增中斷標識位
}

addWaiter方法

private Node addWaiter(Node mode) {
     Node node = new Node(Thread.currentThread(), mode);//新建節點
     // Try the fast path of enq; backup to full enq on failure
     Node pred = tail;//獲取到尾指標
     if (pred != null) {//尾指標不等於空,將當前節點替換為尾指標
         node.prev = pred;
         if (compareAndSetTail(pred, node)) {//採用尾插法,充分利用時間區域性性和空間區域性性。尾插的節點一般不容易被取消。
             pred.next = node;
             return node;
         }
     }
     enq(node);//cas失敗後執行入隊操作,繼續嘗試
     return node;
 }

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;//獲取尾指標
        if (t == null) { //代表當前佇列沒有節點
            if (compareAndSetHead(new Node()))//將當前節點置為頭結點
                tail = head;
        } else {//當前佇列有節點
            node.prev = t;//
            if (compareAndSetTail(t, node)) {//將當前節點置為尾結點
                t.next = node;
                return t;
            }
        }
    }
}

acquireQueued方法

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();//找到當前節點的前驅節點
            if (p == head && tryAcquire(arg)) {//前驅節點等於頭節點嘗試cas搶鎖。
                setHead(node);//搶鎖成功將當前節點設定為頭節點
                p.next = null; // help GC  當頭結點置空
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&//當佇列中有節點在等待,判斷是否應該阻塞
                parkAndCheckInterrupt())//阻塞等待,檢查中斷標識位
                interrupted = true;//將中斷標識位置為true
        }
    } finally {
        if (failed)//
            cancelAcquire(node);//取消當前節點
    }
}


 private void cancelAcquire(Node node) {
     // Ignore if node doesn't exist
     if (node == null)//當前節點為空直接返回
         return;

     node.thread = null;//要取消了將當前節點的執行緒置為空
     // Skip cancelled predecessors
     Node pred = node.prev;//獲取到當前節點的前驅節點
     while (pred.waitStatus > 0)//如果當前節點的前驅節點的狀態大於0,代表是取消狀態,一直找到不是取消狀態的節點
         node.prev = pred = pred.prev;
     Node predNext = pred.next;//將當前要取消的節點斷鏈

     node.waitStatus = Node.CANCELLED;//將當前節點的等待狀態置為CANCELLED
     // If we are the tail, remove ourselves.
     if (node == tail && compareAndSetTail(node, pred)) {//如果當前節點是尾結點,將尾結點替換為淺語節點
         compareAndSetNext(pred, predNext, null);//將當前節點的下一個節點置為空,因為當前節點是最後一個節點沒有next指標
     } else {
         // If successor needs signal, try to set pred's next-link
         // so it will get one. Otherwise wake it up to propagate.
         int ws;
         if (pred != head &&//前驅節點不等於頭結點
             ((ws = pred.waitStatus) == Node.SIGNAL ||//前驅節點的狀態不等於SIGNAL
              (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&//前驅節點的狀態小於0,並且cas將前驅節點的等待置為SIGNAL
             pred.thread != null) {//前驅節點的執行緒補位空
             Node next = node.next;//獲取當前節點的next指標
             if (next != null && next.waitStatus <= 0)//如果next指標不等於空並且等待狀態小於等於0,標識節點有效
                 compareAndSetNext(pred, predNext, next);//將前驅節點的next指標指向下一個有效節點
         } else {
             unparkSuccessor(node);//喚醒後續節點 條件:1.前驅節點是頭結點 2.當前節點不是signal,在ReentransLock中基本不會出現,在讀寫鎖時就會出現
         }

         node.next = node; // help GC 將參照指向自身
     }
 }

 private void unparkSuccessor(Node node) {
     /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
     int ws = node.waitStatus;//獲取當前節點狀態
     if (ws < 0)//如果節點為負數也即不是取消節點
         compareAndSetWaitStatus(node, ws, 0);//cas將當前節點置為0

     /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
     Node s = node.next;//獲取到下一個節點
     if (s == null || s.waitStatus > 0) {//下一個節點等於空或者下一個節點是取消節點
         s = null;//將s置為空
         for (Node t = tail; t != null && t != node; t = t.prev)//從尾結點遍歷找到一個不是取消狀態的節點
             if (t.waitStatus <= 0)
                 s = t;
     }
     if (s != null)//如果s不等於空
         LockSupport.unpark(s.thread);//喚醒當前節點s
 }

shouldParkAfterFailedAcquire方法


private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;//獲取上一個節點的等待狀態
    if (ws == Node.SIGNAL)//如果狀態為SIGNAL,代表後續節點有節點可以喚醒,可以安心阻塞去
        /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
        return true;
    if (ws > 0) {//如果當前狀態大於0,代表節點為CANCELLED狀態
        /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
        do {
            node.prev = pred = pred.prev;//從尾節點開始遍歷,找到下一個狀態不是CANCELLED的節點。將取消節點斷鏈移除
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
        //這裡需要注意ws>0時,已經找到了一個不是取消狀態的前驅節點。
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//將找到的不是CANCELLED節點的前驅節點,將其等待狀態置為SIGNAL
    }
    return false;
}

cancelAcquire方法

 private void cancelAcquire(Node node) {
     // Ignore if node doesn't exist
     if (node == null)//當前節點為空直接返回
         return;

     node.thread = null;//要取消了將當前節點的執行緒置為空
     // Skip cancelled predecessors
     Node pred = node.prev;//獲取到當前節點的前驅節點
     while (pred.waitStatus > 0)//如果當前節點的前驅節點的狀態大於0,代表是取消狀態,一直找到不是取消狀態的節點
         node.prev = pred = pred.prev;
     Node predNext = pred.next;//將當前要取消的節點斷鏈

     node.waitStatus = Node.CANCELLED;//將當前節點的等待狀態置為CANCELLED
     // If we are the tail, remove ourselves.
     if (node == tail && compareAndSetTail(node, pred)) {//如果當前節點是尾結點,將尾結點替換為淺語節點
         compareAndSetNext(pred, predNext, null);//將當前節點的下一個節點置為空,因為當前節點是最後一個節點沒有next指標
     } else {
         // If successor needs signal, try to set pred's next-link
         // so it will get one. Otherwise wake it up to propagate.
         int ws;
         if (pred != head &&//前驅節點不等於頭結點
             ((ws = pred.waitStatus) == Node.SIGNAL ||//前驅節點的狀態不等於SIGNAL
              (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&//前驅節點的狀態小於0,並且cas將前驅節點的等待置為SIGNAL
             pred.thread != null) {//前驅節點的執行緒補位空
             Node next = node.next;//獲取當前節點的next指標
             if (next != null && next.waitStatus <= 0)//如果next指標不等於空並且等待狀態小於等於0,標識節點有效
                 compareAndSetNext(pred, predNext, next);//將前驅節點的next指標指向下一個有效節點
         } else {
             unparkSuccessor(node);//喚醒後續節點 條件:1.前驅節點是頭結點 2.當前節點不是signal,在ReentransLock中基本不會出現,在讀寫鎖時就會出現
         }

         node.next = node; // help GC 將參照指向自身
     }
 }

unparkSuccessor方法

 private void unparkSuccessor(Node node) {
     /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
     int ws = node.waitStatus;//獲取當前節點狀態
     if (ws < 0)//如果節點為負數也即不是取消節點
         compareAndSetWaitStatus(node, ws, 0);//cas將當前節點置為0

     /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
     Node s = node.next;//獲取到下一個節點
     if (s == null || s.waitStatus > 0) {//下一個節點等於空或者下一個節點是取消節點
         s = null;//將s置為空
         for (Node t = tail; t != null && t != node; t = t.prev)//從尾結點遍歷找到一個不是取消狀態的節點
             if (t.waitStatus <= 0)
                 s = t;
     }
     if (s != null)//如果s不等於空
         LockSupport.unpark(s.thread);//喚醒當前節點s
 }

release方法

public final boolean release(int arg) {
    if (tryRelease(arg)) {//子類實現如何釋放鎖
        Node h = head;//獲取到頭結點
        if (h != null && h.waitStatus != 0)//獲取到頭結點,如果頭結點不為空,等待狀態不為0,喚醒後續節點
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
    int ws = node.waitStatus;//獲取節點的等待狀態
    if (ws < 0)//如果等待狀態小於0,標識節點屬於有效節點
        compareAndSetWaitStatus(node, ws, 0);//將當前節點的等待狀態置為0

    /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
    Node s = node.next;//獲取到下一個節點
    if (s == null || s.waitStatus > 0) {//如果節點是空,或者是取消狀態的節點,就找到一個非取消狀態的節點,將取消狀態的節點斷鏈後由垃圾回收器進行回收
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)//節點不用空
        LockSupport.unpark(s.thread);//喚醒當前等待的有效節點S
}

acquireShared方法

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)//由子類實現
        doAcquireShared(arg);
}

doAcquireShared方法

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);//將共用節點也即讀執行緒入隊並返回
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();//找到節點的前驅節點
            if (p == head) {//如果前驅節點等於頭結點
                int r = tryAcquireShared(arg);//嘗試獲取共用鎖數量
                if (r >= 0) {//如果鎖的數量大於0,表示還有多餘的共用鎖。這裡等於0也需要進一步判斷。由於如果當執行到這裡時,有另外的執行緒釋放了共用鎖,如果不進行判斷,將會導致釋放鎖的執行緒沒辦法喚醒其他執行緒。所以這裡會偽喚醒一個節點,喚醒的節點後續如果沒有鎖釋放,依舊阻塞在當前parkAndCheckInterrupt方法中
                    setHeadAndPropagate(node, r);//將當前節點的等待狀態設定為Propagate。
                    p.next = null; // help GC
                    if (interrupted)//判斷是會否中斷過
                        selfInterrupt();//設定中斷標識位
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&//判斷是否應該阻塞等待
                parkAndCheckInterrupt方法中())//阻塞並檢查中斷標識
                interrupted = true;//重置中斷標識位
        }
    } finally {
        if (failed)//如果失敗
            cancelAcquire(node);//取消節點
    }
}

setHeadAndPropagate方法

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);//將當前節點置為頭結點
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        if (propagate > 0 //可獲取的共用鎖也即讀鎖的數量,對於ReentrantReadWriteLock而言,永遠都是1,所以會繼續喚醒下一個讀執行緒
            || h == null //如果舊的頭結點為空
            || h.waitStatus < 0 ||//頭結點的等待狀態不為0
            (h = head) == null || h.waitStatus < 0) {//舊頭節點不為空並且等待狀態小於0也即是有效節點
            Node s = node.next;//獲取到node的下一個節點
            if (s == null || s.isShared())//如果node的下一個節點為空或者是共用節點
                doReleaseShared();//喚醒下一個執行緒
        }
    }

releaseShared方法

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {//子類實現釋放鎖
        doReleaseShared();//喚醒後續執行緒
        return true;//釋放成功
    }
    return false;//釋放是吧
}

doReleaseShared方法

private void doReleaseShared() {
    /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
    for (;;) {
        Node h = head;//獲取到當前頭結點
        if (h != null && h != tail) {//如果頭結點不為空並且不等於尾結點
            int ws = h.waitStatus;//獲取當前節點的等待狀態
            if (ws == Node.SIGNAL) {//如果狀態為SIGNAL
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//cas將SIGNAL狀態置為0。SIGNAL標識後續有執行緒需要喚醒
                    continue;            // loop to recheck cases
                unparkSuccessor(h);//喚醒後續執行緒
            }
            else if (ws == 0 &&//如果當前狀態為0。表示有執行緒將其置為0
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))//cas將0狀態置為PROPAGATE。在多個共用鎖同時釋放時,方便繼續進行讀傳播,喚醒後續節點
                continue;                // loop on failed CAS
        }
        if (h == head)//如果頭結點沒有改變,證明沒有必要繼續迴圈等待了,直接退出吧,如果頭結點放生變化,可能有其他執行緒釋放了鎖。
            break;
    }
}

總結:AQS提供了統一的模板,對於如何入隊出隊以及執行緒的喚醒都由AQS提供預設的實現,只需要子類實現自己上鎖和解鎖的邏輯。

3.ReentrantReadWriteLock

基本使用


import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class RWDictionary {
    private final Map<String, String> m = new TreeMap<String, String>();
    private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    private final Lock r = rwl.readLock();
    private final Lock w = rwl.writeLock();
 
    public String get(String key) {
      r.lock();
      try { return m.get(key); }
      finally { r.unlock(); }
    }
    public String[] allKeys() {
      r.lock();
      try {
          return (String[])m.keySet().toArray();
      }
      finally { r.unlock(); }
    }
    public String put(String key, String value) {
      w.lock();
      try { return m.put(key, value); }
      finally { w.unlock(); }
    }
    public void clear() {
      w.lock();
      try { m.clear(); }
      finally { w.unlock(); }
    }
}

Sync類

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 6317671515068378041L;

        /*
         * Read vs write count extraction constants and functions.
         * Lock state is logically divided into two unsigned shorts:
         * The lower one representing the exclusive (writer) lock hold count,
         * and the upper the shared (reader) hold count.
         */

        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

        /** Returns the number of shared holds represented in count  */
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }//高16位元代表共用鎖也即讀鎖
        /** Returns the number of exclusive holds represented in count  */
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }//低16位元代表獨佔鎖也即寫鎖
}

公平鎖的FairSync類實現

//讀寫鎖中公平鎖的實現:如果後續佇列中有等待的節點,那麼進行排隊。但是這樣的效能並不高,可能造成寫飢餓。 
static final class FairSync extends Sync {
        private static final long serialVersionUID = -2274990926593161451L;
        final boolean writerShouldBlock() {
            return hasQueuedPredecessors();
        }
        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }
    }

//如果佇列中有節點在排隊,那麼就進行等待
 public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

非公平鎖的NonfairSync類實現

//非公平鎖的實現相對來說效能較高,不阻塞,可以直接搶鎖,防止寫飢餓。同時有寫執行緒排隊,那麼後續的所有讀操作都需要進行排隊
static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037L;
        final boolean writerShouldBlock() {
            return false; // writers can always barge //非公平鎖寫鎖永遠不阻塞,防止寫飢餓
        }
        final boolean readerShouldBlock() {
            /* As a heuristic to avoid indefinite writer starvation,
             * block if the thread that momentarily appears to be head
             * of queue, if one exists, is a waiting writer.  This is
             * only a probabilistic effect since a new reader will not
             * block if there is a waiting writer behind other enabled
             * readers that have not yet drained from the queue.
             */
            return apparentlyFirstQueuedIsExclusive();
        }
}

//如果佇列中有任務並且不是共用節點也即不是讀操作,那麼返回true,進行阻塞
 final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        return (h = head) != null &&//頭結點不等於空
            (s = h.next)  != null &&//頭結點的下一個節點補位空
            !s.isShared()         &&//並且當前節點不是共用節點,也即讀執行緒
            s.thread != null;//執行緒不為空
    }

寫鎖的lock方法:

public void lock() {
    sync.acquire(1);
}

//同樣呼叫acquire的方法,請參考上文AQS的acquire方法的實現。同樣都是獲取失敗進行排隊

//子類獲取鎖的過程
protected final boolean tryAcquire(int acquires) {
    /*
             * Walkthrough:
             * 1. If read count nonzero or write count nonzero
             *    and owner is a different thread, fail.
             * 2. If count would saturate, fail. (This can only
             *    happen if count is already nonzero.)
             * 3. Otherwise, this thread is eligible for lock if
             *    it is either a reentrant acquire or
             *    queue policy allows it. If so, update state
             *    and set owner.
             */
    Thread current = Thread.currentThread();//獲取當前執行緒
    int c = getState();//獲取當前state狀態值
    int w = exclusiveCount(c);//獲取讀鎖的數量,這裡doug lea一直都是使用二進位制的風格
    if (c != 0) {//代表持有鎖
        // (Note: if c != 0 and w == 0 then shared count != 0)
        if (w == 0 || current != getExclusiveOwnerThread())//如果寫鎖為0,或則當前執行緒不是寫鎖的持有執行緒
            return false;//返回false
        if (w + exclusiveCount(acquires) > MAX_COUNT)//當前鎖重入次數大於最大值,丟擲異常
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        setState(c + acquires);//直接設定狀態,代表鎖重入次數加1
        return true;//返回獲取鎖成功
    }
    if (writerShouldBlock() ||//寫鎖是否應該阻塞
        !compareAndSetState(c, c + acquires))//cas嘗試獲取鎖
        return false;
    setExclusiveOwnerThread(current);//cas搶鎖成功,直接將寫鎖的持有執行緒設定為當前執行緒
    return true;//返回true代表獲取鎖成功
}

writerShouldBlock的實現

//writerShouldBlock的實現請參考上面的公平鎖和非公平鎖的實現

寫鎖的unlock方法:

public void unlock() {
    sync.release(1);
}

//這裡是AQS的模板方法
public final boolean release(int arg) {
    if (tryRelease(arg)) {//子類實現如何釋放鎖
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())//如果不是獨佔鎖也即寫鎖,直接拋異常
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;//將鎖的數量減少
    boolean free = exclusiveCount(nextc) == 0;//如果獨佔鎖也即寫鎖數量為0,標識已經釋放。大於1是因為鎖重入的問題
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}

讀鎖的lock方法:

public void lock() {
    sync.acquireShared(1);
}

//acquireShared方法請看上面AQS的實現
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}


protected final int tryAcquireShared(int unused) {
    /*
             * Walkthrough:
             * 1. If write lock held by another thread, fail.
             * 2. Otherwise, this thread is eligible for
             *    lock wrt state, so ask if it should block
             *    because of queue policy. If not, try
             *    to grant by CASing state and updating count.
             *    Note that step does not check for reentrant
             *    acquires, which is postponed to full version
             *    to avoid having to check hold count in
             *    the more typical non-reentrant case.
             * 3. If step 2 fails either because thread
             *    apparently not eligible or CAS fails or count
             *    saturated, chain to version with full retry loop.
             */
    Thread current = Thread.currentThread();//獲取當前執行緒
    int c = getState();//獲取當前state狀態
    if (exclusiveCount(c) != 0 &&//如果獨佔鎖也即寫鎖的數量不等於0
        getExclusiveOwnerThread() != current)//獨佔鎖的擁有執行緒不是當前執行緒
        return -1;//直接返回-1,獲取失敗
    int r = sharedCount(c);//獲取共用鎖的數量
    if (!readerShouldBlock() &&//讀鎖是否應該被阻塞
        r < MAX_COUNT &&//讀鎖數量是否小於最大數量
        compareAndSetState(c, c + SHARED_UNIT)) {//cas將讀鎖+1
        if (r == 0) {//當前共用鎖也即讀鎖數量為0
            firstReader = current;//標記為第一個讀執行緒
            firstReaderHoldCount = 1;//將第一個讀執行緒持有鎖數量置為1,主要為了判斷讀鎖的重入
        } else if (firstReader == current) {//第一個執行緒讀等於當前執行緒
            firstReaderHoldCount++;//第一個讀執行緒持有鎖數量
        } else {
            HoldCounter rh = cachedHoldCounter;//先讀取到最後一個執行緒持有共用鎖的數量
            if (rh == null || rh.tid != getThreadId(current))//如果為空或者最後一個執行緒不是當前執行緒
                cachedHoldCounter = rh = readHolds.get();//從快取中獲取當前執行緒持有讀鎖的數量
            else if (rh.count == 0)//如果讀鎖持有數量為0
                readHolds.set(rh);//將值設定並儲存
            rh.count++;//持有執行緒數+1
        }
        return 1;//返回1標識獲取鎖成功
    }
    return fullTryAcquireShared(current);
}

final int fullTryAcquireShared(Thread current) {
    /*
             * This code is in part redundant with that in
             * tryAcquireShared but is simpler overall by not
             * complicating tryAcquireShared with interactions between
             * retries and lazily reading hold counts.
             */
    HoldCounter rh = null;//
    for (;;) {
        int c = getState();//獲取當前狀態值
        if (exclusiveCount(c) != 0) {//如果持有寫鎖的執行緒數補位0
            if (getExclusiveOwnerThread() != current)//持有寫鎖的執行緒不是當前執行緒
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
        } else if (readerShouldBlock()) {//讀鎖是否應該被阻塞
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)//如果讀執行緒的數量等於最大數量,丟擲異常
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {//cas嘗試搶鎖
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

讀鎖的unlock方法

public void unlock() {
    sync.releaseShared(1);
}

//詳情請檢視上面AQS的實現
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {//如果當前執行緒是第一個讀執行緒
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)//當前第一個讀執行緒持有的讀鎖的數量是1
            firstReader = null;//直接置位空
        else
            firstReaderHoldCount--;//否則將持有讀鎖的數量減1
    } else {
        HoldCounter rh = cachedHoldCounter;//先獲取最後一個讀執行緒的持有資訊
        if (rh == null || rh.tid != getThreadId(current))//如果rh為空或者最後一個讀執行緒不是當前執行緒
            rh = readHolds.get();//從快取中獲取當前執行緒的持有鎖的資訊
        int count = rh.count;//獲取到數量
        if (count <= 1) {//如果持有讀鎖的數量小於等於1
            readHolds.remove();//直接清空快取
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;//減去持有讀鎖的數量
    }
    for (;;) {
        int c = getState();//獲取當前狀態
        int nextc = c - SHARED_UNIT;//將讀鎖的數量減1
        if (compareAndSetState(c, nextc))//cas原子性賦值
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;//如果為0表示當前執行緒已經釋放了所有讀鎖。
    }
}

4.留言

本文章只是JUC 中AQS的一部分,後續的文章會對基於AQS鎖實現的子類進行拓展講解,以上文章內容基於個人以及結合別人文章的理解,如果有問題或者不當之處歡迎大家留言交流。由於為了保證觀看流暢性,其中一部分原始碼有重複的地方。請見諒