如果 把ReentrantLock比做一個人的話,那麼 AQS 就是他的靈魂。離開 AQS 談論鎖都是耍流氓
繼承,模板方法設計模式
accquire
acquireInterruptibly
tryAcquireNanos
acquireShared
acquireSharedInterruptibly
tryAcquireSharedNanos
release
releaseShared
獨佔式獲取 tryAcquire
獨佔式釋放 tryRelease
共用式獲取 tryAcquireShared
共用式釋放 tryReleaseShared
這個同步器是否處於獨佔模式 isHeldExclusively
getState:獲取當前的同步狀態
setState:設定當前同步狀態
compareAndSetState 使用CAS設定狀態,保證狀態設定的原子性
final void lock() {
acquire(1);
}
//p1197
public final void acquire(int arg) {
//tryAcquire 為需要覆蓋,需要子類實現的方法
//EXCLUSIVE表示正在一個獨佔模式下等待
//如果一個執行緒執行 tryAcquire 恰好拿到了鎖,那麼就不再執行acquireQueued了。也不會放到待執行的佇列了
if (!tryAcquire(arg) &&
//addWaiter 下面 p605
//acquireQueued 下面857
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//子類實現tryAcquire 方法範例
protected boolean tryAcquire(int arg) {
if(compareAndSetState(0,1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
把呼叫 lock 的執行緒包裝成 node 節點放到佇列中,並把這個 node 節點返回
判斷當前佇列有沒有尾節點(佇列是否為空):
//p605
//mode == null
private Node addWaiter(Node mode) {
// 使用當前執行緒 構造一個 node,構造器如下 p505
Node node = new Node(Thread.currentThread(), mode);
//如果當前有尾節點(等待佇列不為空,非初始化)
Node pred = tail;
if (pred != null) {
// 當前節點的前驅節點地址賦值
node.prev = pred;
// 使用 CAS 把當新增進來的節點設定成尾節點
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果當前沒有尾節點(等待佇列為空,初始化佇列的時候) p583
enq(node);
return node;
}
//p505
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
如果尾節點為空(那麼head也是空的,整個佇列都是空的),才會執行enq。
所以此方法最後會形成一個『空節點node ⇆ 當前執行緒 node』,這樣一個雙連結串列
//p583
private Node enq(final Node node) {
//自旋
for (;;) {
Node t = tail;
//佇列初始化
if (t == null) {
//cas 方式設定 空節點 到 head 中,且 tail 與 head 同步
if (compareAndSetHead(new Node()))
tail = head;
} else {
//當前節點 的 上一個節點指標指向 if 中 造出來的那個 空節點
node.prev = t;
//cas 設定當前節點為尾節點
if (compareAndSetTail(t, node)) {
//if 中 造出來的那個空節點的下一個節點指向當前節點
t.next = node;
return t;
}
}
}
}
head 節點為已經拿到鎖的執行緒
此方法基本思路
1.先拿到當前節點的上一個節點,如果上一個節點為head且當前節點嘗試拿了一下鎖拿到了。那麼就把當前節點設定成head,上一個節點脫鉤,返回 false
2.否則就會執行shouldParkAfterFailedAcquire、parkAndCheckInterrupt
3.感覺如果p == head && tryAcquire(arg)條件不滿足迴圈將永遠無法結束,當然不會出現死迴圈,奧祕在於parkAndCheckInterrupt會把當前執行緒掛起,從而阻塞住執行緒的呼叫棧(阻塞住的意思就是走到這個方法parkAndCheckInterrupt,就不動了,卡住了)。
//p857
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//如果他的前驅節點是頭結點,且已經執行完畢(state 已經復位),執行tryAcquire後被我當前執行緒搶佔了鎖,那麼執行把當前節點設定為head的操作
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//下面的兩個方法主要是檢查執行緒的狀態,是否被中斷。且阻塞當前執行緒(parkAndCheck)
//shouldParkAfterFailedAcquire p795
//parkAndCheckInterrupt p835
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
//如果發生異常,會執行此段程式碼,取消加入佇列
cancelAcquire(node);
}
}
#p795
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//This node has already set status asking a release to signal it, so it can safely park.
//規則1:該方法首先檢查前趨結點的waitStatus位,如果為SIGNAL,表示前趨結點會通知它,那麼它可以放心大膽地掛起了,返回 true 後呼叫acquireQueued方法的 parkAndCheckInterrupt)將導致執行緒阻塞
return true;
if (ws > 0) {
//Predecessor was cancelled. Skip over predecessors and indicate retry.
//規則2:如果前趨結點是一個被取消的結點怎麼辦呢?那麼就向前遍歷跳過被取消的結點,直到找到一個沒有被取消的結點為止,將找到的這個結點作為它的前趨結點,將找到的這個結點的waitStatus位設定為SIGNAL,返回false表示執行緒不應該被掛起。然後進去acquireQueued方法迴圈
//接下來acquireQueued迴圈會出現兩種結果
//1.因為在這一步幹掉了前一個被取消了的節點,把前前一個節點變成了前一個節點,恰巧前一個節點還是頭結點,所以當前節點嘗試tryAcquire可能會獲取到鎖。
//2.前前一個節點不是頭結點,那麼就會再次進入到該方法。此次前一個節點肯定是SIGNAL狀態,所以當前節點被毫無疑問的掛起(阻塞)
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//如果前繼節點狀態為非SIGNAL、非CANCELLED,則設定前繼的狀態為SIGNAL,返回false後進入acquireQueued的無限迴圈
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
p835
//這個方法的主要任務就是阻斷執行緒
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
public final boolean release(int arg) {
//tryRelease aqs 的具體實現類重寫
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
//設定 waitstatus 為0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
//如果下個節點是null或者下個節點被cancelled,就找到佇列最開始的非cancelled的節點
if (s == null || s.waitStatus > 0) {
s = null;
//為什麼要從後往前找??原因在addWaiter方法:
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null){
//釋放 一個被阻塞的執行緒
LockSupport.unpark(s.thread);
}
}
//我們從這裡可以看到,節點入隊並不是原子操作,也就是說,node.prev = pred; compareAndSetTail(pred, node) 這兩個地方可以看作Tail入隊的原子操作
//但是此時pred.next = node;還沒執行,如果這個時候執行了unparkSuccessor方法,就沒辦法從前往後找了,所以需要從後往前找。
//還有一點原因,在產生CANCELLED狀態節點的時候,先斷開的是Next指標,Prev指標並未斷開,因此也是必須要從後往前遍歷才能夠遍歷完全部的Node。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
呼叫 LockSupport.unpark(s.thread); 釋放當前執行緒阻塞,程式順序往下執行業務程式碼。
引數為空或 false 是非公平鎖。為 true 是公平鎖。
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
//如果直接修改了 state ,那麼就證明現在沒有其他資源在獲得鎖,本執行緒直接拿到鎖。(原來鎖就是操作了 一個 volitale 的 state)
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
//實現 aqs 的 tryAcquire
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//如果是0,證明還沒有執行緒拿到這個鎖,直接 cas 設定 state
if (c == 0) {
if (compareAndSetState(0, acquires)) {
//記錄當前拿到鎖的執行緒是哪個
setExclusiveOwnerThread(current);
return true;
}
}
//鎖重入的時候
//如果當前拿鎖的執行緒是 已經拿到鎖的執行緒,那麼增加重入次數
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//如果當前鎖已經被獲取,且拿鎖的執行緒不是當前執行緒,那麼返回 false,tryAcquire 失敗
return false;
}
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//如果目前沒有執行緒拿鎖
if (c == 0) {
//hasQueuedPredecessors 是 公平鎖的關鍵
//對下面的 if 換一種好理解的寫法
/**
if(!hasQueuedPredecessors()){
if(compareAndSetState(0, acquires)){
setExclusiveOwnerThread(current);
return true;
}
}
*/
//如果 hasQueuedPredecessors 返回 false,那麼證明等待佇列中沒有執行緒在排隊。 那麼執行 compareAndSetState 拿鎖
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
//hasQueuedPredecessors是公平鎖加鎖時判斷等待佇列中是否存在有效節點的方法。
//如果返回False,說明當前執行緒可以爭取共用資源;如果返回True,說明佇列中存在有效節點,當前執行緒必須加入到等待佇列中。
//判斷佇列中是否有 有效節點,以真實執行緒節點 入隊為準。不以虛節點(new Node())為準
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
//head 的下一個元素
Node s;
//1.如果 h != t 為false (h == t 為 true) ,那麼直接返回 false,在tryAcquire 中執行 set state 操作
//2.(s = h.next) == null && h != t 是走到了 enq ①處,此時 tail == node,head == new Node()。
// head 的 next 還未指向 tail,但是 node 已入 tail。所以公平起見當前執行緒就不能再搶佔鎖了。 返回 true,使當前執行緒不會再設定 state
//3.如果第二次拿鎖的執行緒 和第一次拿執行緒的執行緒不是同一個執行緒,那麼第二個執行緒應該讓渡第一個執行緒(如果是同一個執行緒,那麼兩次拿鎖就不分先後)
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
//分析此段程式碼需從 enq 方法看起
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
//①
t.next = node;
return t;
}
}
}
}
總結:
final void lock() {
acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
protected final boolean tryRelease(int releases) {
//可重入鎖 state -1
int c = getState() - releases;
//如果解鎖執行緒 和 持鎖執行緒不是同一個 拋異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//如果當前執行緒的鎖全部解完,那麼就代表 此執行緒已經釋放了鎖
if (c == 0) {
//釋放鎖成功
free = true;
//持鎖執行緒 釋放
setExclusiveOwnerThread(null);
}
//設定 state 數量
setState(c);
return free;
}