16.ReentrantLock全解讀

2023-05-06 12:00:46

大家好,我是王有志,歡迎和我聊技術,聊漂泊在外的生活。快來加入我們的Java提桶跑路群:共同富裕的Java人

經歷了AQS的前世今生後,我們已經知道AQS是Java中提供同步狀態原子管理,執行緒阻塞/喚醒,以及執行緒排隊功能的同步器基礎框架。那麼我們今天就來學習通過AQS實現的ReentrantLock。按照慣例,先來看3道關於ReentrantLock的常見面試題:

  • 什麼是ReentrantLock?

  • ReentrantLock內部原理是怎樣的?如何實現可重入性?

  • ReentrantLocksynchronized有什麼區別?該如何選擇?

接下來,我會盡可能的通過剖析原始碼的方式為大家解答以上的題目。

ReentrantLock是什麼?

ReentrantLock譯為可重入鎖,在《一文看懂並行程式設計中的鎖》中我們解釋過鎖的可重入特性:同一執行緒可以多次加鎖,即可以重複進入被鎖定的邏輯中

Doug Lea是這樣描述ReentrantLock的:

A reentrant mutual exclusion {@link Lock} with the same basic behavior and semantics as the implicit monitor lock accessed using {@code synchronized} methods and statements, but with extended capabilities.

「A reentrant mutual exclusion Lock」說明ReentrantLock除了具有可重入的特性,還是一把互斥鎖。接著看後面的內容,ReentrantLock與使用synchronized方法/語句有相同的基本行為和語意。最後的" but with extended capabilities"則表明了ReentrantLock具有更好的拓展能力。

那麼可重入互斥鎖就是ReentrantLock的全部嗎?別急,我們接著往後看:

The constructor for this class accepts an optional fairness parameter. When set true, under contention, locks favor granting access to the longest-waiting thread. Otherwise this lock does not guarantee any particular access order.

ReentrantLock提供了公平/非公平兩種模式預設非公平模式,可以通過構造器引數指定公平模式。

好了,目前為止我們已經對ReentrantLock有了比較清晰的認知了,按照《一文看懂並行程式設計中的鎖》中的分類,ReentrantLock本質是互斥鎖,具有可重入特性,此外ReentrantLock還實現了公平和非公平兩種模式

ReentrantLock怎麼用?

ReentrantLocak的使用非常簡單:

ReentrantLock lock = new ReentrantLock();
lock.lock();
// 業務邏輯
lock.unlock();

通過無參構造器建立ReentrantLock物件後,呼叫lockunlock進行加鎖和解鎖的操作。除了無參構造器外,ReentrantLock還提供了一個有參構造器:

// 無參構造器
public ReentrantLock() {
  sync = new NonfairSync();
}

// 有參構造器
public ReentrantLock(boolean fair) {
  sync = fair ? new FairSync() : new NonfairSync();
}

FairSyncNonfairSyncReentrantLock的內部類,可以通過構造器來指定ReentrantLock的公平模式或非公平模式。具體實現我們先按下不表,先來看ReentrantLock中提供的其它方法。

加鎖方法

除了常用的lock外,ReentrantLock還提供了3個加鎖方法:

// 嘗試獲取鎖
public boolean tryLock();

// 嘗試獲取鎖,否則排隊等候指定時間
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException;

// 嘗試獲取鎖
public void lockInterruptibly() throws InterruptedException;

tryLock直接嘗試獲取鎖,特點在於競爭失敗時直接返回false,不會進入佇列等待。過載方法tryLock(long timeout, TimeUnit unit)增加的在佇列中的最大等待時間,如果鎖競爭失敗,會加入到等待佇列中,再次嘗試獲取鎖,直到超時或中斷

lockInterruptibly的特點是,呼叫thread.interrupt中斷執行緒後丟擲InterruptedException異常,結束競爭。雖然lock也允許中斷執行緒,但它並不會丟擲異常

其他方法

除了常用的加鎖方法外,ReentrantLock還提供了用於分析鎖的方法:

方法宣告 作用
public int getHoldCount() 返回當前執行緒持有鎖的次數,即當前執行緒重入鎖的次數
public final int getQueueLength() 返回等待獲取鎖的執行緒數量估算值
public final boolean hasQueuedThread(Thread thread) 查詢當前執行緒是否在等待獲取鎖
public final boolean hasQueuedThreads() 是否有執行緒在等待獲取鎖
public final boolean isFair() 是否為公平鎖
public boolean isHeldByCurrentThread() 當前執行緒是否持有鎖
public boolean isLocked() 鎖是否被執行緒持有,即鎖是否被使用
public Condition newCondition() 建立條件物件
public int getWaitQueueLength(Condition condition) 等待在該條件上的執行緒數量
public boolean hasWaiters 是否有執行緒在等待在該條件上

ReentrantLock原始碼分析

接下來,我們通過原始碼來分析ReentrantLock的公平/非公平模式,以及重入性的實現原理,並對比不同的加鎖方法的實現差異。

ReentrantLock的結構

我們先來來了解下ReentrantLock的結構:

public class ReentrantLock implements Lock, java.io.Serializable {

  private final Sync sync;
  
  // 同步器
  abstract static class Sync extends AbstractQueuedSynchronizer {}
  
  // 非公平模式同步器
  static final class NonfairSync extends Sync {}
  
  // 公平模式同步器
  static final class FairSync extends Sync {}
}

ReentrantLock僅僅實現了Lock介面,並沒有直接繼承AbstractQueuedSynchronizer,其內部類Sync繼承AbstractQueuedSynchronizer,並提供了FairSyncNonfairSync兩種實現,分別是公平鎖和非公平鎖。

公平/非公平模式

我們已經知道,可以指定不同的引數來建立公平/非公平模式的ReentrantLock,反應到原始碼中是使用不同的Sync的實現類:

public ReentrantLock(boolean fair) {
  sync = fair ? new FairSync() : new NonfairSync();
}

並且在加鎖/解鎖操作中,均由Sync的實現類完成,ReentrantLock只是對Lock介面的實現:

public class ReentrantLock implements Lock, java.io.Serializable {
  public void lock() {
    sync.acquire(1);
  }
  
  public void unlock() {
    sync.release(1);
  }
}

先來回想下《AQS的今生,構建出JUC的基礎》中的acquire方法:

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
  
  public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
      selfInterrupt();
    }
  }
}

AQS自身僅實現了將執行緒新增到等待佇列中的acquireQueued方法,而預留了獲取鎖的tryAcquire方法。

那麼我們不難想到,ReentrantLock的作用機制:繼承自AQS的Sync,實現了tryAcquire方法來獲取鎖,並藉助AQS的acquireQueued實現排隊的功能,而ReentrantLock的公平與否,與tryAcquire的實現方式是息息相關的。

公平鎖FairSync

FairSync非常簡單,僅做了tryAcquire方法的實現:

static final class FairSync extends Sync {
  @ReservedStackAccess
  protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    // 獲取同步狀態,AQS實現
    int c = getState();
    // 判斷同步狀態
    // c == 0時,表示沒有執行緒持有鎖
    // c != 0時,表示有執行緒持有鎖
    if (c == 0) {
      // hasQueuedPredecessors判斷是否有已經在等待鎖的執行緒
      if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(current);
        return true;
      }
    } else if (current == getExclusiveOwnerThread()) {
      // 執行緒重入,同步狀態+1
      int nextc = c + acquires;
      if (nextc < 0) {
        throw new Error("Maximum lock count exceeded");
      }
      // 更新同步狀態
      setState(nextc);
      return true;
    }
    return false;
  }
}

c == 0時,鎖未被任何執行緒持有,通過hasQueuedPredecessors判斷是否已經有等待鎖的執行緒,如果沒有正在等待的執行緒,則通過compareAndSetState(0, acquires)嘗試替換同步狀態來獲取鎖;當c != 0時,鎖已經被執行緒持有,通過current == getExclusiveOwnerThread判斷是否為當前執行緒持有,如果是則認為是重入,執行int nextc = c + acquires,更新同步狀態setState(nextc),並返回成功。

FairSync的公平性體現在獲取鎖前先執行hasQueuedPredecessors,確認是否已經有執行緒在等待鎖,如果有則tryAcquire執行失敗,默默的執行AQS的acquireQueued加入等待佇列中即可。

非公平鎖NonfairSync

NonfairSync也只是做了tryAcquire的實現,而且還只是掉用了父類別的nonfairTryAcquire方法:

static final class NonfairSync extends Sync {
  protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
  }
}

abstract static class Sync extends AbstractQueuedSynchronizer {
  
  @ReservedStackAccess
  final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    
    int c = getState();
    if (c == 0) {
      if (compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(current);
        return true;
      }
    } else if (current == getExclusiveOwnerThread()) {
      int nextc = c + acquires;
      if (nextc < 0) {
        throw new Error("Maximum lock count exceeded");
      }
      setState(nextc);
      return true;
    }
    return false;
  }
}

nonfairTryAcquireFairSync#tryAcquire簡直是一模一樣,忽略方法宣告,唯一的差別就在於,當c == 0時,nonfairTryAcquire並不會呼叫hasQueuedPredecessors確認是否有執行緒正在等待獲取鎖,而是直接通過compareAndSetState(0, acquires)嘗試替換同步狀態來獲取鎖。

NonfairSync的不公平體現在獲取鎖前不會不會確認是否有執行緒正在等待鎖,而是直接獲取鎖,如果獲取失敗,依舊會執行AQS的acquireQueued加入等待佇列。

可重入性的實現

AQS的今生,構建出JUC的基礎》中,提到過ReentrantLock的重入性依賴於同步狀態state作為計數器的特性實現,在公平鎖FairSync和非公平鎖NonfairSync的實現中我們也看到,執行緒重入時會執行同步狀態+1的操作:

int nextc = c + acquires;
setState(nextc);

既然lock操作中有同步狀態+1的操作,那麼unlock操作中就一定有同步狀態-1的操作:

public class ReentrantLock implements Lock, java.io.Serializable {
  public void unlock() {
    sync.release(1);
  }
  
  abstract static class Sync extends AbstractQueuedSynchronizer {
    @ReservedStackAccess
    protected final boolean tryRelease(int releases) {
      // 執行緒退出,同步狀態-1
      int c = getState() - releases;
      if (Thread.currentThread() != getExclusiveOwnerThread()) {
        throw new IllegalMonitorStateException();
      }
      boolean free = false;
      if (c == 0) {
        // 同步狀態為0,鎖未被持有,釋放獨佔鎖
        free = true;
        setExclusiveOwnerThread(null);
      }
      setState(c);
      return free;
    }
  }
}

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
  public final boolean release(int arg) {
    if (tryRelease(arg)) {
      Node h = head;
      if (h != null && h.waitStatus != 0){
        unparkSuccessor(h);
      }
      return true;
    }
    return false;
  }
}

tryRelease的實現並不複雜,同步狀態-1後,如果同步狀態為0,表示鎖未被持有,修改鎖的獨佔執行緒,然後更新同步狀態。

我們再來看ReentrantLock的可重入性的實現,是不是非常簡單了?判斷是否是執行緒重入依賴的是getExclusiveOwnerThread方法,獲取當前獨佔鎖的執行緒,記錄重入次數依賴的是同步狀態作為計數器的特性。

現在能夠理解為什麼ReentrantLocklock要與unlock操作成對出現了吧?最後,提個小問題,為什麼lockunlock操作中,只有當c == 0時的lock操作需要使用CAS?

加鎖方法的差異

我們前面已經瞭解過ReentrantLock提供的4個加鎖方法了,分別是:

  • public void lock(),最常用的加鎖方法,允許中斷,但不會丟擲異常,加鎖失敗進入等待佇列;

  • public void lockInterruptibly(),允許中斷,丟擲InterruptedException異常,加鎖失敗進入佇列直到被喚醒或者被中斷;

  • public boolean tryLock(),嘗試直接加鎖,加鎖失敗不會進入佇列,而是直接返回false;

  • public boolean tryLock(long timeout, TimeUnit unit),嘗試直接加鎖,中斷時丟擲InterruptedException異常,加鎖失敗進入佇列,直到指定時間內加鎖成功,或者超時。

lock與lockInterruptibly

lock方法的呼叫:

public class ReentrantLock implements Lock, java.io.Serializable {
  public void lock() {
    sync.acquire(1);
  }
}

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
  public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
      selfInterrupt();
    }
  }
}

lockInterruptibly方法的呼叫:

public class ReentrantLock implements Lock, java.io.Serializable {
  public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
  }
}

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
  public final void acquireInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted()) {
      throw new InterruptedException();
    }
    
    if (!tryAcquire(arg)) {
      doAcquireInterruptibly(arg);
    }
  }
}

可以看到,差異主要體現在acquireQueueddoAcquireInterruptibly的實現上:

final boolean acquireQueued(final Node node, int arg) {
  boolean interrupted = false;
  try {
    for (;;) {
      final Node p = node.predecessor();
      if (p == head && tryAcquire(arg)) {
        setHead(node);
        p.next = null;
        return interrupted;
      }
      // 當parkAndCheckInterrupt為true時,修改interrupted標記為中斷
      if (shouldParkAfterFailedAcquire(p, node))
        interrupted |= parkAndCheckInterrupt();
    }
  } catch (Throwable t) {
    cancelAcquire(node);
    if (interrupted)
      selfInterrupt();
    throw t;
  }
}

private void doAcquireInterruptibly(int arg) throws InterruptedException {
  final Node node = addWaiter(Node.EXCLUSIVE);
  try {
    for (;;) {
      final Node p = node.predecessor();
      if (p == head && tryAcquire(arg)) {
        setHead(node);
        p.next = null;
        return;
      }
      // 當parkAndCheckInterrupt為true時,丟擲異常
      if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
        throw new InterruptedException();
    }
  } catch (Throwable t) {
    cancelAcquire(node);
    throw t;
  }
}

從原始碼上來看,差異體現在對parkAndCheckInterrupt結果的處理方式上,acquireQueued只標記中斷狀態,而doAcquireInterruptibly直接丟擲異常。

tryLock與它的過載方法

public boolean tryLock()的實現非常簡單:

  public boolean tryLock() {
    return sync.nonfairTryAcquire(1);
}

直接呼叫Sync#nonfairTryAcquire,在前面非公平鎖的內容中我們已經知道nonfairTryAcquire只是進行了一次非公平的加鎖嘗試,如果沒有呼叫AQS的acquireQueued不會加入到等待佇列中。

tryLock的過載方法也並不複雜,按照之前的習慣,應該是有著特殊的acquireQueued實現:

public class ReentrantLock implements Lock, java.io.Serializable {
  public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
  }
}

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
  public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted()) {
      throw new InterruptedException();
    }
    return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
  }
  
  private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (nanosTimeout <= 0L)
      return false;
    // 計算超時時間
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    try {
      for (;;) {
        final Node p = node.predecessor();
        if (p == head && tryAcquire(arg)) {
          setHead(node);
          p.next = null;
          return true;
        }
        // 判斷超時時間
        nanosTimeout = deadline - System.nanoTime();
        if (nanosTimeout <= 0L) {
          cancelAcquire(node);
          return false;
        }
        
        // 呼叫LockSupport.parkNanos暫停指定時間
        if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
          LockSupport.parkNanos(this, nanosTimeout);
        
        // 執行緒中斷丟擲異常
        if (Thread.interrupted())
          throw new InterruptedException();
      }
    } catch (Throwable t) {
      cancelAcquire(node);
      throw t;
    }
  }
}

public boolean tryLock(long timeout, TimeUnit unit)的特性依賴於LockSupport.parkNanos暫停執行緒指定時間的能力。另外,我們可以注意到在判斷是否需要park時,對nanosTimeoutSPIN_FOR_TIMEOUT_THRESHOLD的判斷:

  • nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD時,認為一次park和upark對效能的影響小於自旋nanosTimeout納秒;

  • nanosTimeout < SPIN_FOR_TIMEOUT_THRESHOLD時,認為一次park和upark對效能的影響大於自旋nanosTimeout納秒。

到這裡我們就把4個加鎖方法的差異講完了,大體邏輯是相似的(如,喚醒頭節點),只是為了實現某些特性新增了一些細節,大家可以認真閱讀原始碼,很容易就能看出差異。

結語

關於ReentrantLock的內容到這裡就結束了,因為已經把AQS的部分單獨拆了出來,所以今天並沒有太複雜的內容。大家的重點可以放在ReentrantLock是如何藉助AQS實現公平/非公平模式,以及可重入的特性上,諸如getHoldCountisFair這類方法,相信大家已經能夠想象到是如何實現的了,可以結合原始碼驗證自己的想法。

最後,希望今天的內容能夠幫助你更清晰的理解ReentrantLock,如果文章中出現錯誤,也希望大家不吝賜教。


好了,今天就到這裡了,Bye~~