這篇還是接著ReentrantLock的公平鎖,沒看過第0篇的可以先去看上一篇https://www.cnblogs.com/sunankang/p/16456342.html
這篇就以問題為導向,先提出問題,然後根據問題去看程式碼
A,B兩執行緒,A執行緒執行完業務釋放鎖過程中B執行緒新增進了連結串列,如何保證B執行緒能正常醒來
現在假設A執行緒走完tryAcuqire
後獲取到鎖,執行業務程式碼,最後unlock()
tryAcquire程式碼就不進去看了,上篇講過了 現在只需關注兩個點
lock方法中的acquireQueued
用來park
unlock方法中的release
用來unpark
首先來看park的條件是啥
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
進入acquireQueued
方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) //在這裡進行的park
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
也就是shouldParkAfterFailedAcquire
如果這個方法返回true,才會去park
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
現在假設第一種情況,首次進入這個shouldParkAfterFailedAcquire
方法的時候,A執行緒就進入unlock方法了 那麼此時節點狀態如下圖
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
//主要看這段程式碼
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
那麼h!=null
進入,但是頭節點的waitStatus還是0,所以不走unpark,A執行緒結束
A執行緒結束了誰來喚醒B執行緒呢? 回到acquireQueued方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
因為第一次進入shouldParkAfterFailedAcquire
方法中,最後走到else程式碼塊,我們假設沒有發生衝突,修改成功
A執行緒執行完了unlock,而此時鎖的狀態值為0,沒有被持有的狀態,最外層的for(;;)
讓程式碼又重新跑了一遍
第二次的時候if (p == head && tryAcquire(arg))
這個if就會進入,因為現在已經沒有其他執行緒在持有鎖了,所以tryAcquire
嘗試獲取鎖成功,返回ture
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
在setHead方法中將當前節點,咱們這個例子中也就是B節點,設定為head,之後清空上個參照和當前參照的執行緒
最後清除上個節點對B節點的參照,此時節點關係如下
而原來的頭節點沒有任何參照,等待GC即可,也可以看到在程式碼p.next = null; // help GC
這段旁邊寫的註釋 幫助GC
之後將失敗狀態設定為false,返回是否被打斷的變數,lock方法結束,
現在來假設在shouldParkAfterFailedAcquire
方法中修改成功,但此時的A執行緒還沒有走到unlock,當B執行緒馬上要開始走parkAndCheckInterrupt
方法開始park的時候,時間片用完的情況
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
//====假設此時B執行緒在這裡=====
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
此時節點關係如下
A執行緒的unlock就可以進入unparkSuccessor
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
第一個if判斷為true,嘗試修改狀態為0 (這裡沒看懂為什麼是嘗試修改)
if (s == null || s.waitStatus > 0)
這個判斷我們是不進入的,注意unparkSuccessor
這個方法的node引數是head節點,而不是我們的B節點,所以繼續執行下面的if判斷
s
就是B節點,在B執行緒park前喚醒,B執行緒再走到park的時候是不會再進行park的,直接返回,方法結束
A執行緒在執行,B執行緒初始化連結串列中的過程中,A執行緒執行完成,釋放鎖,C執行緒進入
我們只需要看執行緒B初始化連結串列的情況即可
addWaiter
中enq
方法
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
//假設執行緒B走到這裡時間片用完,還沒來得及設定tail
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
那麼此時執行緒A解鎖了,執行緒C呼叫lock方法
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
在tryAcquire
方法的hasQueuedPredecessors
方法中
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
此時tail還是null,而head已經被執行緒B設定了一個空Node,h!=t
為true,h也只是一個空Node,所以(s = h.next) == null
為true,整體返回true,外層取反為false,退出tryAcquire方法去入佇列
那麼入佇列會破壞佇列的初始化或者C執行緒變成第一個排隊的節點嗎?,注意咱們現在假設的執行緒B還沒有獲取到cpu的呼叫,還是停在 tail = head;
程式碼執行前
執行緒C執行addWaiter
方法
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
這個時候tail還是空,進入enq
方法
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
首先第一個判斷是會進入的,這個時候tail還是空,但是if (compareAndSetHead(new Node()))
方法不會成功,來看看程式碼
private final boolean compareAndSetHead(Node update) {
//注意第三個引數 null
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
判斷的是head為null的時候才會進行修改,所以執行緒C沒有修改成功,那麼會一直在for(;;)
中迴圈,直到執行緒B初始化完空的頭節點,也就是執行tail = head;
這段程式碼
如果執行緒B走完了 tail = head;
沒來得及進行第二次迴圈新增B節點的時候,執行緒A解鎖了,執行緒C進來了呢
還是在tryAcquire
方法的hasQueuedPredecessors
中
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
這個時候第一個h!=t
就是false,因為B執行緒已經將head和tail的參照指向同一個空節點了,返回false
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//因為返回false,取反則進行獲取鎖的操作
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
C執行緒直接獲取鎖去執行程式碼了,所以ReentrantLock的公平鎖其實並不是絕對的公平