ReentrantLock介紹及原始碼解析

2023-02-08 18:00:36

ReentrantLock介紹及原始碼解析

一、ReentrantLock介紹

  • ReentrantLock是JUC包下的一個並行工具類,可以通過他顯示的加鎖(lock)和釋放鎖(unlock)來實現執行緒的安全存取,ReentrantLock還可以實現公平鎖和非公平鎖,並且其與synchronized的作用是一致的,區別在於加鎖的底層實現不一樣,寫法上也不一樣,具體異同可以參見下圖:

二、ReentrantLock的原始碼簡析

1、原始碼分析

  • ReentrantLock(下面簡稱RL)就是AQS獨佔鎖的一個典型實現,其通過維護state變數的值來判斷當前執行緒是否能夠擁有鎖,如果通過cas將state成功從0變成1表示爭用資源成功,否則表示爭用失敗,進入CLH佇列,通過CLH佇列來維護那些暫時沒搶佔到鎖資源的執行緒;其內部維護了一個名為Sync的內部類來繼承AQS,又因為RL既可以支援公平鎖也可以支援非公平鎖,所以其內部還維護了兩個內部類FairSync和NonfairSync來繼承Sync,通過他們來實現AQS的模板方法從而實現加鎖的過程;類的關係圖如下:

  • 公平鎖和非公平鎖在原始碼層的兩點區別:

    1、非公平上來直接搶鎖

    2、當state=0時,非公平直接搶,公平鎖還會判斷佇列還有沒有前置節點

2、lock方法的原始碼跟蹤

下面就讓我們跟蹤RL的lock()和unLock()原始碼來看看程式碼級別是怎麼實現的吧!

需要注意的是,本文跟蹤的是非公平鎖的加解鎖過程,公平鎖的實現大體一致,當原始碼中有與公平鎖的顯著差別時我會通過註釋給出解釋

  • 試用例如下
public static void main(String[] args) throws InterruptedException {
    long start = System.currentTimeMillis();
    List<Thread> list = new ArrayList<>();
    ReentrantLock lock = new ReentrantLock();
    for (int i = 0; i < 1000; i++) {
        Thread thread = new Thread(()-> {
            for (int j = 0; j < 1000; j++) {
                // 解鎖
                lock.lock();
                count++;
                // 釋放鎖
                lock.unlock();
            }
        });
        list.add(thread);
    }
    for (Thread thread : list) {
        thread.start();
    }
    for (Thread thread : list) {
        thread.join();
    }
    System.out.println("auto.count = " + count + "耗時:" + (System.currentTimeMillis() -start));
}

(1)、lock()的原始碼跟蹤與解析

跟蹤lock.lock()發現其呼叫的是內部類Sync的lock()方法,該方法是一個抽象方法,具體實現由FairSync和NonfairSync實現,由於我們構造RL時呼叫的是無參建構函式,所以這裡會直接進入NonfairSync的lock()方法;具體實現程式碼和註釋如下:

/**
 * java.util.concurrent.locks.ReentrantLock.NonfairSync#lock()
 */
final void lock() {
    // 由於是非公平鎖所以這裡上來直接爭搶資源,嘗試通過CAS操作將state的值由0變成1
    if (compareAndSetState(0, 1)) 
        // 如果成功將state值變成1表示爭搶鎖成功,設定當前擁有獨佔存取權的執行緒。
        setExclusiveOwnerThread(Thread.currentThread());
    else
        // 爭搶失敗再進入與公平鎖一樣的排隊邏輯
        acquire(1);
}

tips:

1、上面的compareAndSetState方法也是由AQS提供的,裡面藉助Unsafe實現了對state的cas操作更新

2、setExclusiveOwnerThread也可以理解成由AQS提供(其實是AQS的父類別,不過不影響理解),給exclusiveOwnerThread變數賦值,exclusiveOwnerThread表示當前正在擁有鎖的執行緒

3、acquire方法同樣由AQS提供,其內部實現也是lock環節比較關鍵的程式碼,下面我會詳細解釋

(2)、acquire()的原始碼跟蹤與解析

acquire方法的原始碼如下:

/**
 *  java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire(int)
 */
public final void acquire(int arg) {
    /**
     * 1、嘗試獲取鎖;如果成功此方法結束,當前執行緒執行同步程式碼塊
     * 2、如果獲取失敗,則構造Node節點並加入CLH佇列
     * 3、然後繼續等待鎖
     */
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 如果獲取鎖失敗,新增CLH佇列也失敗,那麼直接中斷當前執行緒
        selfInterrupt();
}

tips:

1、tryAcquire方法是AQS的一個模板方法,RL下的公平和非公平鎖都有不同的實現,下面會詳解

2、addWaiter方法是AQS的一個預設實現方法,負責構造當前執行緒所在的Node,並將其設定到佇列的尾巴上

3、acquireQueued方法也是AQS的預設實現,旨在設定CLH佇列的head和阻塞當前執行緒

上面的三個方法下面也會一一介紹

(3)、tryAcquire()的原始碼跟蹤與解析

  • tryAcquire()方法可以理解成嘗試獲取鎖,如果獲取成功即表示當前執行緒擁有了鎖;跟蹤原始碼需要注意的一點是:該方法在非公平鎖(NonFairSync)下的實現最終呼叫的是Sync裡的nonfairTryAcquire方法,所以我們直接觀察該方法是如何實現的即可
/**
 * java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire(int)
 */
final boolean nonfairTryAcquire(int acquires) {
    // 當前執行緒
    final Thread current = Thread.currentThread();
    // 獲取當前state的值
    int c = getState();
    if (c == 0) {
        /**
         * 非公平鎖發現資源未被佔用時直接CAS嘗試搶佔資源;而公平鎖發現資源未被佔用時
         * 先判斷佇列裡是否還有前置節點再等待,沒有才會去搶佔資源
         */
        if (compareAndSetState(0, acquires)) {
            // 如果成功將state值變成1表示爭搶鎖成功,設定當前擁有獨佔存取權的執行緒。
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    /** 
     * 如果state!=0表示有爭用,再判斷當前系統擁有獨佔許可權的執行緒是不是當前執行緒,
     * 如果是,則需要支援執行緒重入,將state的值加1
     */
    else if (current == getExclusiveOwnerThread()) {// 處理可重入的邏輯
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    // state既不等於0也不需要重入則返回false;表示獲取鎖失敗,程式碼返回後繼續執行acquireQueued方法
    return false;
}

(4)addWaiter()的原始碼跟蹤與解析

  • 執行到addWaiter方法表示前面的tryAcquire嘗試獲取鎖失敗了,需要由此方法構建Node節點並加入到CLH佇列的末尾;此方法返回的Node即為當前CLH佇列的tail節點
/**
 * java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter(java.util.concurrent.locks.AbstractQueuedSynchronizer.Node)
 */
private Node addWaiter(Node mode) {
    // 構建Node物件
    Node node = new Node(Thread.currentThread(), mode);
    /**
     * 將當前佇列的尾節點賦值給pred,通過命名和下面的程式碼其實可以發現就是想讓tail作為當前節點的前置節點;
     * 但是為什麼不直接用tail而將其賦值給pred再用呢?我想應該是考慮並行環境下tail的參照有可能會被其他執行緒改變
     */
    Node pred = tail;
    if (pred != null) {
        // 如果當前佇列的尾結點(tail)不為空,就將其作為當前Node節點的前置節點
        node.prev = pred;
        // 然後通過AQS自帶的cas方法將當前構建的Node節點插入到佇列的尾巴上
        if (compareAndSetTail(pred, node)) {
            // 如果成功了,前置節點也就是之前的tail節點的後繼節點就是當前節點,賦值
            pred.next = node;
            // 返回構建的Node節點,即當前佇列的tail節點
            return node;
        }
    }
    // 如果佇列的tail節點為空,或者cas設定tail節點失敗的話呼叫此方法;旨在重新設定佇列的tail節點
    enq(node);
    return node;
}

(5)、acquireQueued()的原始碼跟蹤與解析

  • 當執行緒通過tryAcquire上鎖失敗,然後通過addWaiter將當前執行緒新增到佇列末尾後,通過此方法再次判斷是否輪到當前節點,並再次嘗試獲取鎖,獲取不到的話進行阻塞操作,原始碼與註釋如下:
/**
 * java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued(java.util.concurrent.locks.AbstractQueuedSynchronizer.Node, int)
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 獲取tail節點的前置節點
            final Node p = node.predecessor();
            /**
             * 如果前置節點就是頭節點表示當前tail節點就是第二個節點,就可以嘗試著去獲取鎖,
             * 然後將tail節點設定成頭節點,返回執行緒中斷狀態為false;表示當前執行緒獲取到鎖
             */
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                /**
                 * 既然tail已經獲取到鎖了,那麼前置節點就沒用了,這裡將前置節點的next設定為空,
                 * 是為了方便垃圾回收,因為如果不指定為空,前置節點的next就是當前的tail節點,
                 * 不會被回收
                 */
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            /**
             * 如果前置節點不為head,或者雖然前置節點是head但是獲取鎖失敗,那麼就
             * 需要在這裡將執行緒阻塞,阻塞利用的是LockSupport.park(thread)來實現的
             */
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            // 退出獲取鎖
            cancelAcquire(node);
    }
}

至此,RL非公平鎖加鎖的過程的原始碼跟蹤完畢,流程也不算複雜,下面簡單梳理一遍:

1、上來直接嘗試獲取鎖(修改state值),成功表示獲取成功

2、否則執行tryAcquire方法嘗試通過cas的方式獲取鎖,並處理可能存在的重入操作

3、獲取失敗則通過addWriter方法構建Node節點並加入CLH佇列的末尾

4、然後在acquireQueued裡再次獲取鎖,獲取失敗則阻塞當前執行緒;

下面簡單畫了一下lock()方法的呼叫泳道圖

1、呼叫父類別AQS的compareAndSetState通過cas的模式嘗試將state狀態改為1,修改成功則持有鎖,將當前執行緒設為ExclusiveOwnerThread

3、unLock方法的原始碼跟蹤

  • 釋放鎖其實就是將state狀態減1,然後處理可重入邏輯,如果沒有重入的話直接喚醒當前佇列的head節點,把當前執行緒所在的Node節點從佇列中剔除
  • unLock方法對應AQS的tryRelease模板方法的實現,其沒有lock那麼複雜,因為不用支援公平和非公平鎖,所以其可以直接在sync中呼叫AQS提供的release方法,然後觸發tryRelease,呼叫sync裡的tryRelease實現從而實現解鎖

AQS的release原始碼

/**
 * java.util.concurrent.locks.AbstractQueuedSynchronizer#release(int)
 */
public final boolean release(int arg) {
    // 嘗試釋放鎖
    if (tryRelease(arg)) {
        // 釋放成功,判斷當前佇列頭節點是否為空,不為空並且等待狀態不等於0則喚醒當前佇列的頭節點
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

RL的tryRelease實現

/**
 * java.util.concurrent.locks.ReentrantLock.Sync#tryRelease(int)
 * @param releases
 * @return
 */
protected final boolean tryRelease(int releases) {
    // state減1
    int c = getState() - releases;
    // 如果當前執行緒不是正在獲取到鎖的執行緒直接拋異常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 如果state減1後等於0表示沒有重入,表示釋放鎖成功,將當前獲取鎖的執行緒置空
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    // 將最新的state狀態更新到AQS中
    setState(c);
    return free;
}

unlock()總結:

1、呼叫父類別AQS的release方法實際呼叫的是tryRelease這個模板方法由ReentrantLock本身實現

2、tryRelease方法嘗試將state減1,如果減完等於0表示解鎖成功,將ExclusiveOwner執行緒設為空;並且喚醒佇列的頭節點(unparkSuccessor)。

3、如果不等於0表示解鎖失敗,將state設為減1過後的值;也是為了可重入