大家好,我是王有志,歡迎和我聊技術,聊漂泊在外的生活。快來加入我們的Java提桶跑路群:共同富裕的Java人。
經歷了AQS的前世和今生後,我們已經知道AQS是Java中提供同步狀態原子管理,執行緒阻塞/喚醒,以及執行緒排隊功能的同步器基礎框架。那麼我們今天就來學習通過AQS實現的ReentrantLock
。按照慣例,先來看3道關於ReentrantLock
的常見面試題:
什麼是ReentrantLock
?
ReentrantLock
內部原理是怎樣的?如何實現可重入性?
ReentrantLock
和synchronized
有什麼區別?該如何選擇?
接下來,我會盡可能的通過剖析原始碼的方式為大家解答以上的題目。
ReentrantLock
譯為可重入鎖,在《一文看懂並行程式設計中的鎖》中我們解釋過鎖的可重入特性:同一執行緒可以多次加鎖,即可以重複進入被鎖定的邏輯中。
Doug Lea是這樣描述ReentrantLock
的:
A reentrant mutual exclusion {@link Lock} with the same basic behavior and semantics as the implicit monitor lock accessed using {@code synchronized} methods and statements, but with extended capabilities.
「A reentrant mutual exclusion Lock」說明ReentrantLock除了具有可重入的特性,還是一把互斥鎖。接著看後面的內容,ReentrantLock
與使用synchronized
方法/語句有相同的基本行為和語意。最後的" but with extended capabilities"則表明了ReentrantLock
具有更好的拓展能力。
那麼可重入互斥鎖就是ReentrantLock
的全部嗎?別急,我們接著往後看:
The constructor for this class accepts an optional fairness parameter. When set true, under contention, locks favor granting access to the longest-waiting thread. Otherwise this lock does not guarantee any particular access order.
ReentrantLock提供了公平/非公平兩種模式,預設非公平模式,可以通過構造器引數指定公平模式。
好了,目前為止我們已經對ReentrantLock
有了比較清晰的認知了,按照《一文看懂並行程式設計中的鎖》中的分類,ReentrantLock
本質是互斥鎖,具有可重入特性,此外ReentrantLock
還實現了公平和非公平兩種模式。
ReentrantLocak
的使用非常簡單:
ReentrantLock lock = new ReentrantLock();
lock.lock();
// 業務邏輯
lock.unlock();
通過無參構造器建立ReentrantLock
物件後,呼叫lock
和unlock
進行加鎖和解鎖的操作。除了無參構造器外,ReentrantLock
還提供了一個有參構造器:
// 無參構造器
public ReentrantLock() {
sync = new NonfairSync();
}
// 有參構造器
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
FairSync
和NonfairSync
是ReentrantLock
的內部類,可以通過構造器來指定ReentrantLock
的公平模式或非公平模式。具體實現我們先按下不表,先來看ReentrantLock
中提供的其它方法。
除了常用的lock
外,ReentrantLock
還提供了3個加鎖方法:
// 嘗試獲取鎖
public boolean tryLock();
// 嘗試獲取鎖,否則排隊等候指定時間
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException;
// 嘗試獲取鎖
public void lockInterruptibly() throws InterruptedException;
tryLock
直接嘗試獲取鎖,特點在於競爭失敗時直接返回false,不會進入佇列等待。過載方法tryLock(long timeout, TimeUnit unit)
增加的在佇列中的最大等待時間,如果鎖競爭失敗,會加入到等待佇列中,再次嘗試獲取鎖,直到超時或中斷。
lockInterruptibly
的特點是,呼叫thread.interrupt
中斷執行緒後丟擲InterruptedException
異常,結束競爭。雖然lock
也允許中斷執行緒,但它並不會丟擲異常。
除了常用的加鎖方法外,ReentrantLock
還提供了用於分析鎖的方法:
方法宣告 | 作用 |
---|---|
public int getHoldCount() |
返回當前執行緒持有鎖的次數,即當前執行緒重入鎖的次數 |
public final int getQueueLength() |
返回等待獲取鎖的執行緒數量估算值 |
public final boolean hasQueuedThread(Thread thread) |
查詢當前執行緒是否在等待獲取鎖 |
public final boolean hasQueuedThreads() |
是否有執行緒在等待獲取鎖 |
public final boolean isFair() |
是否為公平鎖 |
public boolean isHeldByCurrentThread() |
當前執行緒是否持有鎖 |
public boolean isLocked() |
鎖是否被執行緒持有,即鎖是否被使用 |
public Condition newCondition() |
建立條件物件 |
public int getWaitQueueLength(Condition condition) |
等待在該條件上的執行緒數量 |
public boolean hasWaiters |
是否有執行緒在等待在該條件上 |
接下來,我們通過原始碼來分析ReentrantLock
的公平/非公平模式,以及重入性的實現原理,並對比不同的加鎖方法的實現差異。
我們先來來了解下ReentrantLock
的結構:
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
// 同步器
abstract static class Sync extends AbstractQueuedSynchronizer {}
// 非公平模式同步器
static final class NonfairSync extends Sync {}
// 公平模式同步器
static final class FairSync extends Sync {}
}
ReentrantLock
僅僅實現了Lock
介面,並沒有直接繼承AbstractQueuedSynchronizer
,其內部類Sync
繼承AbstractQueuedSynchronizer
,並提供了FairSync
和NonfairSync
兩種實現,分別是公平鎖和非公平鎖。
我們已經知道,可以指定不同的引數來建立公平/非公平模式的ReentrantLock
,反應到原始碼中是使用不同的Sync
的實現類:
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
並且在加鎖/解鎖操作中,均由Sync
的實現類完成,ReentrantLock
只是對Lock
介面的實現:
public class ReentrantLock implements Lock, java.io.Serializable {
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
}
先來回想下《AQS的今生,構建出JUC的基礎》中的acquire
方法:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
selfInterrupt();
}
}
}
AQS自身僅實現了將執行緒新增到等待佇列中的acquireQueued
方法,而預留了獲取鎖的tryAcquire
方法。
那麼我們不難想到,ReentrantLock
的作用機制:繼承自AQS的Sync
,實現了tryAcquire
方法來獲取鎖,並藉助AQS的acquireQueued
實現排隊的功能,而ReentrantLock
的公平與否,與tryAcquire
的實現方式是息息相關的。
FairSync
非常簡單,僅做了tryAcquire
方法的實現:
static final class FairSync extends Sync {
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 獲取同步狀態,AQS實現
int c = getState();
// 判斷同步狀態
// c == 0時,表示沒有執行緒持有鎖
// c != 0時,表示有執行緒持有鎖
if (c == 0) {
// hasQueuedPredecessors判斷是否有已經在等待鎖的執行緒
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
// 執行緒重入,同步狀態+1
int nextc = c + acquires;
if (nextc < 0) {
throw new Error("Maximum lock count exceeded");
}
// 更新同步狀態
setState(nextc);
return true;
}
return false;
}
}
當c == 0
時,鎖未被任何執行緒持有,通過hasQueuedPredecessors
判斷是否已經有等待鎖的執行緒,如果沒有正在等待的執行緒,則通過compareAndSetState(0, acquires)
嘗試替換同步狀態來獲取鎖;當c != 0
時,鎖已經被執行緒持有,通過current == getExclusiveOwnerThread
判斷是否為當前執行緒持有,如果是則認為是重入,執行int nextc = c + acquires
,更新同步狀態setState(nextc)
,並返回成功。
FairSync
的公平性體現在獲取鎖前先執行hasQueuedPredecessors
,確認是否已經有執行緒在等待鎖,如果有則tryAcquire
執行失敗,默默的執行AQS的acquireQueued
加入等待佇列中即可。
NonfairSync
也只是做了tryAcquire
的實現,而且還只是掉用了父類別的nonfairTryAcquire
方法:
static final class NonfairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
@ReservedStackAccess
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) {
throw new Error("Maximum lock count exceeded");
}
setState(nextc);
return true;
}
return false;
}
}
nonfairTryAcquire
與FairSync#tryAcquire
簡直是一模一樣,忽略方法宣告,唯一的差別就在於,當c == 0
時,nonfairTryAcquire
並不會呼叫hasQueuedPredecessors
確認是否有執行緒正在等待獲取鎖,而是直接通過compareAndSetState(0, acquires)
嘗試替換同步狀態來獲取鎖。
NonfairSync
的不公平體現在獲取鎖前不會不會確認是否有執行緒正在等待鎖,而是直接獲取鎖,如果獲取失敗,依舊會執行AQS的acquireQueued
加入等待佇列。
《AQS的今生,構建出JUC的基礎》中,提到過ReentrantLock
的重入性依賴於同步狀態state作為計數器的特性實現,在公平鎖FairSync
和非公平鎖NonfairSync
的實現中我們也看到,執行緒重入時會執行同步狀態+1的操作:
int nextc = c + acquires;
setState(nextc);
既然lock
操作中有同步狀態+1的操作,那麼unlock
操作中就一定有同步狀態-1的操作:
public class ReentrantLock implements Lock, java.io.Serializable {
public void unlock() {
sync.release(1);
}
abstract static class Sync extends AbstractQueuedSynchronizer {
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
// 執行緒退出,同步狀態-1
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread()) {
throw new IllegalMonitorStateException();
}
boolean free = false;
if (c == 0) {
// 同步狀態為0,鎖未被持有,釋放獨佔鎖
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0){
unparkSuccessor(h);
}
return true;
}
return false;
}
}
tryRelease
的實現並不複雜,同步狀態-1後,如果同步狀態為0,表示鎖未被持有,修改鎖的獨佔執行緒,然後更新同步狀態。
我們再來看ReentrantLock
的可重入性的實現,是不是非常簡單了?判斷是否是執行緒重入依賴的是getExclusiveOwnerThread
方法,獲取當前獨佔鎖的執行緒,記錄重入次數依賴的是同步狀態作為計數器的特性。
現在能夠理解為什麼ReentrantLock
中lock
要與unlock
操作成對出現了吧?最後,提個小問題,為什麼lock
和unlock
操作中,只有當c == 0
時的lock
操作需要使用CAS?
我們前面已經瞭解過ReentrantLock
提供的4個加鎖方法了,分別是:
public void lock()
,最常用的加鎖方法,允許中斷,但不會丟擲異常,加鎖失敗進入等待佇列;
public void lockInterruptibly()
,允許中斷,丟擲InterruptedException
異常,加鎖失敗進入佇列直到被喚醒或者被中斷;
public boolean tryLock()
,嘗試直接加鎖,加鎖失敗不會進入佇列,而是直接返回false;
public boolean tryLock(long timeout, TimeUnit unit)
,嘗試直接加鎖,中斷時丟擲InterruptedException
異常,加鎖失敗進入佇列,直到指定時間內加鎖成功,或者超時。
lock
方法的呼叫:
public class ReentrantLock implements Lock, java.io.Serializable {
public void lock() {
sync.acquire(1);
}
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
selfInterrupt();
}
}
}
lockInterruptibly
方法的呼叫:
public class ReentrantLock implements Lock, java.io.Serializable {
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
if (!tryAcquire(arg)) {
doAcquireInterruptibly(arg);
}
}
}
可以看到,差異主要體現在acquireQueued
和doAcquireInterruptibly
的實現上:
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
return interrupted;
}
// 當parkAndCheckInterrupt為true時,修改interrupted標記為中斷
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
return;
}
// 當parkAndCheckInterrupt為true時,丟擲異常
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
從原始碼上來看,差異體現在對parkAndCheckInterrupt
結果的處理方式上,acquireQueued
只標記中斷狀態,而doAcquireInterruptibly
直接丟擲異常。
public boolean tryLock()
的實現非常簡單:
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
直接呼叫Sync#nonfairTryAcquire
,在前面非公平鎖的內容中我們已經知道nonfairTryAcquire
只是進行了一次非公平的加鎖嘗試,如果沒有呼叫AQS的acquireQueued
不會加入到等待佇列中。
tryLock
的過載方法也並不複雜,按照之前的習慣,應該是有著特殊的acquireQueued
實現:
public class ReentrantLock implements Lock, java.io.Serializable {
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}
return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 計算超時時間
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
return true;
}
// 判斷超時時間
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) {
cancelAcquire(node);
return false;
}
// 呼叫LockSupport.parkNanos暫停指定時間
if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
LockSupport.parkNanos(this, nanosTimeout);
// 執行緒中斷丟擲異常
if (Thread.interrupted())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
}
public boolean tryLock(long timeout, TimeUnit unit)
的特性依賴於LockSupport.parkNanos
暫停執行緒指定時間的能力。另外,我們可以注意到在判斷是否需要park時,對nanosTimeout
與SPIN_FOR_TIMEOUT_THRESHOLD
的判斷:
當nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD
時,認為一次park和upark對效能的影響小於自旋nanosTimeout
納秒;
當nanosTimeout < SPIN_FOR_TIMEOUT_THRESHOLD
時,認為一次park和upark對效能的影響大於自旋nanosTimeout
納秒。
到這裡我們就把4個加鎖方法的差異講完了,大體邏輯是相似的(如,喚醒頭節點),只是為了實現某些特性新增了一些細節,大家可以認真閱讀原始碼,很容易就能看出差異。
關於ReentrantLock
的內容到這裡就結束了,因為已經把AQS的部分單獨拆了出來,所以今天並沒有太複雜的內容。大家的重點可以放在ReentrantLock
是如何藉助AQS實現公平/非公平模式,以及可重入的特性上,諸如getHoldCount
,isFair
這類方法,相信大家已經能夠想象到是如何實現的了,可以結合原始碼驗證自己的想法。
最後,希望今天的內容能夠幫助你更清晰的理解ReentrantLock
,如果文章中出現錯誤,也希望大家不吝賜教。
好了,今天就到這裡了,Bye~~