同步狀態變數:state就是那個共用資源(private volatile int state;) Lock類繼承AQS類並定義lock()、unLock()的方法,表示獲取鎖和釋放鎖。多執行緒並行存取同一個lock範例,lock()方法會cas修改state變數,修改成功的執行緒獲得鎖,其他執行緒進入AQS佇列等待。
沒有必要!sync佇列是雙向連結串列結構,出隊時,head交替方式,只需要修改head和head後繼2個節點參照關係;固定head,就要修改head,head後繼,以及head後繼的後繼 共3個節點。顯然前者效率更高
不存在的,因為經過判斷得出此時node就是head的後繼。並且必須由這個取消節點node來喚醒後繼,要不node執行緒結束後,就沒有執行緒能夠喚醒佇列裡的其他節點了。
先說結果:由搶到鎖的那個執行緒來喚醒!
上述的場景是存在的,例如在非公平鎖模式中,B執行緒被A執行緒喚醒,A結束,B成為head,B去執行tryAcquire(),但此時C執行緒搶佔到鎖,B執行tryAcquire()沒有拿到鎖,再次park阻塞。C執行緒執行結束後將A喚醒
只有將前置節點狀態改為SIGNAL,才能確保當前節點可以被前置unPark喚醒。也就是說阻塞自己前先保證一定能夠被喚醒。因為程式碼中:
獨佔模式下,喚醒後繼前先限制:h.waitStatus != 0
共用模式下,喚醒後繼前先限制:h.waitStatus=SIGNAL
表示本執行緒在獲取資源期間,如果被其他執行緒中斷,本執行緒不會因為中斷而取消獲取資源,只是將中斷標記傳遞下去。
When acquired in exclusive mode,
* attempted acquires by other threads cannot succeed. Shared mode
* acquires by multiple threads may (but need not) succeed. This class
* does not "understand" these differences except in the
* mechanical sense that when a shared mode acquire succeeds, the next
* waiting thread (if one exists) must also determine whether it can
* acquire as well. Threads waiting in the different modes share the
* same FIFO queue.
- 共用模式:允許多個執行緒同時獲取資源;當一個節點的執行緒獲取共用資源後,需要要通知後繼共用節點的執行緒,也可以獲取了。共用節點具有傳播性,傳播性的目的也是儘快通知其他等待的執行緒儘快獲取鎖。
- 獨佔模式: 只能夠一個執行緒佔有資源,其它嘗試獲取資源的執行緒將會進入到佇列等待。
- 響應中斷並終止:執行緒只要被中斷就不會獲取資源:兩種情況的中斷:1、剛嘗試獲取、2、進入佇列中等待,前者立即停止獲取,後者執行取消邏輯,等待節點變為取消狀態
A、B先後進入佇列,狀態都是0。A獲得資源,進入setHeadAndPropagate晉升為head,A進入doReleaseShared嘗試喚醒B時,但B還沒將A改為signal,因為A還是0,A將狀態改為PROPAGATE
對於AbstractQueuedSynchronizer的分析,最核心的就是sync queue的分析。
//addWaiter():
node.prev = pred; // 1 更新node節點的prev域
if (compareAndSetTail(pred, node)) {
pred.next = node; //2 更新node前驅的next域
return node;
}
//enq():
node.prev = t; // 1 更新node節點的prev域
if (compareAndSetTail(t, node)) {
t.next = node;//2 更新node前驅的next域
return t;
}
//unparkSuccessor():
Node s = node.next; //通過.next來直接獲取到節點的後繼節點,這個節點的後繼的prev一定指向節點本身
//....
if (s != null)
LockSupport.unpark(s.thread);
- addWaiter() 或者enq()插入節點時,都是先更新節點的prev域,再更新它前驅的next域。那麼通過node.next()取到的後繼,後繼的prev域一定是指向node本身。如果先更新next域,在更新prev域時出現異常,那麼通過.next取到不是完整的節點
- unparkSuccessor()喚醒後繼時,Node s = node.next; 通過.next來獲取node的後繼,後繼的prev一定指向node本身
因為取消節點的next域指向了自身,所以不能從通過next來遍歷,但prev是完整的,所以通過prev來遍歷。
AQS是一個用來構建鎖和同步器的框架。理論參考:JUC同步器框架
三個基本元件相互共同作業:
- 同步狀態的原子性管理;
- 執行緒的阻塞與喚醒;
- 佇列的管理;
同步器一般包含兩種方法,一種是acquire,另一種是release。acquire操作阻塞呼叫的執行緒,直到或除非同步狀態允許其繼續執行。而release操作則是通過某種方式改變同步狀態,使得一或多個被acquire阻塞的執行緒繼續執行。
兩種方式:
分析類,首先就要分析底層採用了何種資料結構,抓住核心點進行分析:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable
繼承自抽象類:AbstractOwnableSynchronizer,父類別提供獨佔執行緒的設定與獲取的方法
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 3737899427754241961L;
protected AbstractOwnableSynchronizer() { }// 建構函式
private transient Thread exclusiveOwnerThread; //獨佔模式下的執行緒
// 設定獨佔執行緒
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
// 獲取獨佔執行緒
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
protected boolean tryAcquire(int arg) {//獨佔方式獲取鎖
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) { //釋放獨佔的鎖
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) { //以共用方式獲取鎖
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {//釋放共用鎖
throw new UnsupportedOperationException();
}
protected boolean isHeldExclusively() {//是否獨佔資源
throw new UnsupportedOperationException();
}
關於重寫說明:
目的是將共用資源state的讀寫交給子類管理,AQS專注在佇列的維護以及執行緒的阻塞與喚醒
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 7373984972572414691L;
// 頭節點
private transient volatile Node head;
// 尾節點
private transient volatile Node tail;
//0:表示沒有執行緒獲取到鎖;1表示有執行緒獲取到鎖;大於1:表示有執行緒獲得了鎖,且允許重入
private volatile int state;
// 自旋時間
static final long spinForTimeoutThreshold = 1000L;
// 以下跟cas有關
private static final Unsafe unsafe = Unsafe.getUnsafe(); // Unsafe類範例
private static final long stateOffset; // state記憶體偏移地址
private static final long headOffset; // head記憶體偏移地址
private static final long tailOffset; // state記憶體偏移地址
private static final long waitStatusOffset;// tail記憶體偏移地址
private static final long nextOffset; // next記憶體偏移地址
// 靜態初始化塊
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
}
說明:
執行緒封裝成Node並具備狀態
static final class Node
{
// 模式,分為共用與獨佔
static final Node SHARED = new Node();// 共用模式
static final Node EXCLUSIVE = null; // 獨佔模式
// 節點狀態
static final int CANCELLED = 1;//表示當前的執行緒被取消
static final int SIGNAL = -1;//表示當前節點的後繼節點包含的執行緒需要被執行【被unpark】,
static final int CONDITION = -2;//表示當前節點在等待condition,也就是在condition佇列中
static final int PROPAGATE = -3;//表示當前場景下後續的acquireShared能夠得以執行
volatile int waitStatus;//節點狀態;表示當前節點在sync佇列中,等待著獲取鎖
volatile Node prev; // 指向當前節點的前驅
volatile Node next;// 指向當前節點的後繼
volatile Thread thread;//節點所對應的執行緒
Node nextWaiter;// 下一個等待者 只跟condition有關
private transient volatile Node head; // 頭節點 懶載入
private transient volatile Node tail; //尾節點 懶載入
private volatile int state; // 同步狀態
// 節點是否在共用模式下等待
final boolean isShared() {
return nextWaiter == SHARED;
}
// 獲取前驅節點,若前驅節點為空,丟擲異常
final Node predecessor() throws NullPointerException {
Node p = prev;// 儲存前驅節點
if (p == null)
throw new NullPointerException();
else
return p;
}
// 無參建構函式
Node() { // Used to establish initial head or SHARED marker
}
// 建構函式
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
關於Node說明:
每個被阻塞的執行緒都會被封裝成一個Node節點,放入佇列。Node包含了一個Thread型別的參照,並且有自己的狀態:
protected AbstractQueuedSynchronizer() { } //預設的無參構造
public final void acquireShared(int arg) {...} // 獲取共用資源的入口(忽略中斷)
protected int tryAcquireShared(int arg); // 嘗試獲取共用資源
private void doAcquireShared(int arg) {...} // AQS中獲取共用資源流程整合
private Node addWaiter(Node mode){...} // 將node加入到同步佇列的尾部
protected int tryAcquireShared(int arg); // 嘗試獲取共用資源
private void setHeadAndPropagate(Node node, int propagate) {...} // 設定 同步佇列的head節點,以及觸發"傳播"操作
private void doReleaseShared() {...} // 遍歷同步佇列,調整節點狀態,喚醒待申請節點
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {...} // 如果獲取資源失敗,則整理隊中節點狀態,並判斷是否要將執行緒掛起
private final boolean parkAndCheckInterrupt() {...} // 將執行緒掛起,並在掛起被喚醒後檢查是否要中斷執行緒(返回是否中斷)
private void cancelAcquire(Node node) {...} // 取消當前節點獲取資源,將其從同步佇列中移除
該函數以獨佔模式獲取(資源),忽略中斷。
流程如下:
原始碼如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt(); //來到這裡,表示執行緒拿到鎖,並且讀取到執行緒的中斷標識為true,呼叫selfInterrupt()來恢復執行緒的interrupted中斷標誌(被parkAndCheckInterrupt()擦除了,所以再設定一次)。
}
static void selfInterrupt() {
Thread.currentThread().interrupt();//執行緒設定interrupted中斷標誌
}
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
acquire()總結
addWaiter:快速新增的方式往sync queue尾部新增節點
// 新增等待者
private Node addWaiter(Node mode) {
// 新生成一個節點
Node node = new Node(Thread.currentThread(), mode);
// 建立臨時參照pred,跟tail指向相同地址
Node pred = tail;
if (pred != null) { // 尾節點不為空,即佇列已經初始化過
// 將node的prev域連線到尾節點
node.prev = pred;
if (compareAndSetTail(pred, node)) { // cas更新tail,指向新建立的node
// 設定尾節點的next域為node
pred.next = node; // 結合 node.prev = pred; 形成雙向連結串列
return node; // 返回新生成的節點
}
}
enq(node); // 佇列還未初始化,或者是compareAndSetTail操作失敗,則進入enq
return node;
}
//關於並行情景說明:
// 從 Node pred = tail; 到 compareAndSetTail(pred, node); 期間,佇列可能插入了新的節點,pred指向的不是最新的tail,那麼compareAndSetTail(pred, node) 就會執行失敗,同時 node.prev = pred; node的前驅也不是最新的tail。
// 通過enq()來解決並行問題,enq()通過自旋+cas來保證執行緒安全
addWaiter()說明:
// 執行緒安全地建立佇列、或者將節點插入佇列、
private Node enq(final Node node) {
for (;;) { // 自旋+cas,確保節點能夠成功入佇列
Node t = tail;//尾節點
if (t == null) { // 尾節點為空,即還沒被初始化
if (compareAndSetHead(new Node())) // 設定head。 !!!!注意,這裡是new node,沒有使用引數的node,因此head節點不參照任何執行緒
tail = head; // 頭節點與尾節點都指向同一個新生節點。迴圈繼續,進入else後,引數node將插入到佇列
} else { // 尾節點不為空,即已經被初始化過
node.prev = t; // 將node節點的prev域連線到尾節點
if (compareAndSetTail(t, node)) { // 比較更新tail,node成為新的tail
// 設定尾節點的next域為node
t.next = node; // 結合 node.prev = t; 形成雙向連結串列
return t; // 返回Node的前驅節點
}
}
}
}
//CAS head field. Used only by enq.
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
//CAS head field. Used only by enq.
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
enq()方法總結:
作用:sync佇列中的節點在獨佔且忽略中斷的模式下獲取(資源)
原始碼如下:
// sync佇列中的節點在獨佔且忽略中斷的模式下獲取(資源):
final boolean acquireQueued(final Node node, int arg) {
// 標誌
boolean failed = true;
try {
// 中斷標識。如果執行緒喚醒後,中斷標識是true,外層的acquire()將進入selfInterrupt()。
boolean interrupted = false;
// 無限迴圈 :如果前驅不是head,那執行緒將park阻塞,等待前面的節點依次執行,直到被unPark喚醒
for (;;) {
// 獲取node的前驅,如果前驅是head,則表明前面已經沒有執行緒等待了,該執行緒可能成為工作執行緒
final Node p = node.predecessor();
// 前驅為頭節點並且成功獲得鎖
if (p == head && tryAcquire(arg)) {
setHead(node); // node晉升為head
p.next = null; // 舊head的next域指向null,將會被GC,移出佇列
failed = false; // 設定標誌
return interrupted; //拿到鎖,break迴圈,並返回中斷標識
}
//執行到這裡,前驅非head 或者 前驅是head但獲取鎖失敗,那麼:1、將前驅狀態改為signal 2、當前執行緒unPark阻塞
//shouldParkAfterFailedAcquire():尋找非取消狀態的前驅,如果狀態為signal返回true 反則,將前驅狀態改為signal、再返回false
//前驅是signal ,執行parkAndCheckInterrupt()後,當前執行緒park阻塞。一直到執行緒被unPark喚醒,再返回執行緒的中斷狀態
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())//parkAndCheckInterrupt返回true表明執行緒中斷狀態為true
//上面if同時成立,才會執行。
interrupted = true; //那麼把中斷標識置為true
}
} finally { //(有異常,在丟擲之前執行finally;沒有異常,在return之前執行finally)
if (failed)//只有try的程式碼塊出現異常,failed才會是true。什麼情景會產生異常?cancelAcquire分析時有說明
cancelAcquire(node); //執行取消邏輯
}
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
private void setHead(Node node) {
head = node;
node.thread = null;//再次表明head的thread屬性是空的
node.prev = null;
}
acquireQueue()總結:
// 當獲取(資源)失敗後:1、判斷能否將當前執行緒park;2、修改前驅節點狀態為signal
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 獲取前驅節點的狀態
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // 狀態為SIGNAL
// 只有當前驅節點為 signal時,才返回true ,表示當前執行緒可以安全地park阻塞;其它情況返回false
return true;
//跳過那些CANCELLED狀態的前驅
if (ws > 0) { // 表示狀態為CANCELLED,為1
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0); // 找到pred節點前面最近的一個狀態不為CANCELLED的節點;然後跳出迴圈並返回false
pred.next = node;
} else { // 為PROPAGATE -3 或者是0 ,(為CONDITION -2時,表示此節點在condition queue中)
// cas更新前驅的狀態為SIGNAL.如果前驅是頭節點,那麼頭節點ws=SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 不能進行park操作
return false;
}
//CAS waitStatus field of a node.
private static final boolean compareAndSetWaitStatus(Node node,int expect, int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
}
shouldParkAfterFailedAcquire()總結:
// 進行park操作並且返回該執行緒的中斷標識
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); //外面的for迴圈可能會導致多次park,不過沒關係,park允許多次執行
//被喚醒之後,返回中斷標記,即如果是正常喚醒則返回false,如果是由於中斷醒來,就返回true
return Thread.interrupted(); // acquireQueued() 中宣告的interrupted 將會被更新為這裡的返回結果
}
public static boolean interrupted() {
return currentThread().isInterrupted(true);//返回當前執行緒interrupted中斷標記,同時會清除此interrupted標記
}
方法總結:
什麼時候才會執行cancelAcquire?
在lockInterruptibly()會通過丟擲中斷異常來執行cancelAcquire方法,lock方法過程中則不會執行該程式碼,作者這麼些的意圖在於for迴圈內部如果出現不可控的因素導致產生未知的異常,則會執行cancelAcquire,很明顯這屬於一種相對偏保守的保險程式碼。
// 取消獲取鎖
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null) // node為空,返回
return;
node.thread = null;// thread置空 備註1
// Skip cancelled predecessors
Node pred = node.prev;// pred表示:最靠近node並且狀態不等於取消的前驅節點
while (pred.waitStatus > 0)
node.prev = pred = pred.prev; //更新pred,往列頭推進
Node predNext = pred.next; //predNext表示:pred的後繼
// 設定node節點的狀態為CANCELLED
node.waitStatus = Node.CANCELLED; //備註2
if (node == tail && compareAndSetTail(node, pred)) { // 若node節點為尾節點,則pred成為尾節點 備註3
// pred的next域置為null
compareAndSetNext(pred, predNext, null);
} else { // 2、node節點不為尾節點,或者比較設定不成功
int ws;
//下面一串判斷,最終目標:在node移除佇列前,將有效的前驅節點狀態改為signal
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
// pred節點不為頭節點,並且
//pred節點的狀態為SIGNAL)或者
// pred節點狀態小於等於0,並且比較並設定等待狀態為SIGNAL成功,並且pred節點所封裝的執行緒不為空
Node next = node.next;
if (next != null && next.waitStatus <= 0) // 後繼不為空並且後繼的狀態小於等於0
compareAndSetNext(pred, predNext, next); // 比較並設定pred.next = next; 到這裡:node的前驅節點指向node的後繼節點。 備註4
} else {
// 這裡,pred==head (3、即node是head的後繼)或者pred.status=0,-2時 【前面while (pred.waitStatus > 0) 已經限制了pred一定是<=0】,執行:
unparkSuccessor(node); // 喚醒node的後繼
}
node.next = node; // help GC 後繼節點指向自身 備註5
}
}
//修改引數node的next域
private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
對cancelAcquire()總結之前,先明確以下兩點:
cancelAcquire(node)總結:
cancelAcquire()負責將node移出佇列,並保持佇列中其他節點的順序關係不變,它做了以下工作:
執行cancelAcquire後,佇列變成這樣的:
發現:
// 喚醒node節點的後繼
private void unparkSuccessor(Node node) {
// 獲取node節點的等待狀態
int ws = node.waitStatus;
if (ws < 0) // 狀態值小於0,為SIGNAL -1 或 CONDITION -2 或 PROPAGATE -3
// cas節點狀態為0
compareAndSetWaitStatus(node, ws, 0);//如果head沒有後繼的情況下,狀態會一直=0
Node s = node.next;
//若後繼為空,或後繼已取消,則從尾部往前遍歷 找到最靠近的一個處於正常阻塞狀態的節點進行喚醒
// 什麼時候s==null ? node的後繼節點是取消狀態時,node.next為null
if (s == null || s.waitStatus > 0) {
s = null;
// 由尾節點向前倒著遍歷佇列,但不會超過node節點
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//喚醒s節點執行緒
}
unparkSuccessor()總結:
- 作用:找到有效的後繼節點unPark喚醒
- 尋找有效後繼時從尾往前倒著遍歷:因為取消節點的next域指向了自身,所以不能從通過next來遍歷
- 將發起unPark喚醒的節點(只能是head)狀態改為0(意味著在head喚醒後繼,到被後繼推出佇列的期間,狀態變為0)
以獨佔模式釋放物件,其原始碼如下:
public final boolean release(int arg) {
if (tryRelease(arg)) { //如果釋放鎖成功
Node h = head;
// 執行緒A呼叫acquire()獲取到鎖之後,A執行緒節點變為head,然後A呼叫release 釋放鎖,存在兩種情況:
// 1、 如果有新的執行緒B入隊,B成為後繼節點,B會將A狀態改為SIGNAL,那麼(h != null && h.waitStatus != 0 )成立,unparkSuccessor()喚醒後繼節點
// 2、如果A後面沒有節點,A狀態是預設值:0 ,那麼h.waitStatus != 0 不成立,直接返回true,不需要喚醒後繼節點。
if (h != null && h.waitStatus != 0) // 頭節點不為空並且頭節點狀態不為0
unparkSuccessor(h); //由head喚醒後繼節點
return true;
}
return false;
}
release()總結:
//獲取共用資源,響應中斷
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted()) //讀取執行緒中斷標記,然後擦除標記
throw new InterruptedException(); //中斷標記為true,丟擲中斷異常,停止執行
if (tryAcquireShared(arg) < 0) //呼叫子類實現方法 獲取資源
doAcquireSharedInterruptibly(arg); //沒有獲取到,那麼再嘗試獲取(進入佇列排隊等待)
}
獲取共用資源流程圖:
acquireSharedInterruptibly()總結:
//獲取共用資源,響應中斷
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); //增加等待節點
boolean failed = true;
try {
for (;;) {//無限迴圈,直到r>0
final Node p = node.predecessor(); // p表示 剛插入節點的前驅
//1、如果前驅是head
if (p == head) {
int r = tryAcquireShared(arg);//呼叫子類實現方法 嘗試獲取共用資源
if (r >= 0) { // >0 表示 獲取到資源
// 1、如果是ReentrantReadWriteLock、CountDownLatch ,有可能r=1
// 2、如果是Semaphore,有可能r=0
// 1、2 都呼叫setHeadAndPropagate進行共用傳播判斷
setHeadAndPropagate(node, r);// 更新head並進行共用傳播
p.next = null; // 將佇列頭節點的next域置空,之後,這個節點將被GC回收
failed = false;
return;
}
}
// 2、前驅不是head
//執行緒park阻塞,直至被unPark喚醒,或者被其它執行緒中斷喚醒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException(); //進入這裡表示執行緒中斷標記為true,那麼丟擲中斷異常
}
} finally {
if (failed) //當try 程式碼塊有異常:中斷異常 或 其他未知異常,failed才是true
cancelAcquire(node);//取消獲取資源
}
}
doAcquireSharedInterruptibly()總結:
setHeadAndPropagate在獲取共用資源的時候被呼叫
// 設定 同步佇列的head節點,以及觸發"傳播"操作
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 記錄更新前的head
setHead(node); //引數node 成為新的head
//判斷:
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next; //獲取node後繼
//後繼為空或者後繼是等待共用資源的節點
if (s == null || s.isShared())
doReleaseShared(); //釋放共用資源
}
}
滿足呼叫doReleaseShared的條件分析:
propagate > 0:
ReentrantReadWriteLock、CountDownLatch 呼叫tryAcquireShared()返回1進入,滿足條件;Semaphore 進入,propagate可能等於0,不滿足,繼續2
h == null:
h == null 表示舊head變為null,程式沒有地方設定head=null,並且這裡h參照著head意味著head不會被GC。 因此,h == null不滿足條件,繼續3 【不知道哪種情況下h==null todo】
h.waitStatus < 0
setHeadAndPropagate()總結:
//遍歷同步佇列,調整節點狀態,喚醒待申請節點
private void doReleaseShared() {
for (;;) {
Node h = head;
//1、head 不等於 tail 且不等於 null
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { //如果head狀態為signal ,cas修改為0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h); //喚醒後繼
}
//如果節點的後繼還沒有將其前驅改為signal,這裡ws==0是成立的
else if (ws == 0 && //如果head狀態為0,cas修改為propagate
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // 如果在int ws = h.waitStatus; 之後,後繼將head節點改為signal,那麼cas失敗,continue繼續迴圈後, if (ws == Node.SIGNAL) 滿足,那麼將會喚醒後繼。
}
// 只有head沒有發生變化,迴圈才會結束,若head改變,繼續迴圈
if (h == head) // loop if head changed
break;
}
}
doReleaseShared()總結:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
releaseShared()方法總結:
有兩種情景,會將取消節點徹底移出連結串列:
以第一個情景為例子分析:
結論:在前驅節點獲取資源時,後繼也能夠有機會申請資源,不需要等待前驅通過releaseShare()來喚醒。
分析如下:
1:A B 先後進入佇列
2:A被喚醒,獲得資源,呼叫setHeadAndPropagate(),晉升為head
3、B呼叫shouldParkAfterFailedAcquire(),嘗試將A狀態改為signal但未執行
4、A進入doReleaseShared(),A狀態等於0(3還沒執行),進入ws == 0 分支處理。
5、此時3執行完成,B將A的狀態改為signal,然後B park阻塞
6、A執行compareAndSetWaitStatus(h, 0, Node.PROPAGATE)失敗,continue繼續
7、A進入(ws == Node.SIGNAL)分支,執行compareAndSetWaitStatus(h, Node.SIGNAL, 0)成功,然後再執行unparkSuccessor(),將B喚醒。
8、A將B喚醒後,A去執行拿到資源後的操作,B也成功拿到資源並執行。
因為步驟6的continue,B不需要等待A執行releaseShare()被喚醒,在A獲取到資源時同時B也能快速獲取到資源,A、B可以同時執行獲得資源後的任務