JUC是Java中一個包 java.util.concurrent 。在這個包下,基本存放了Java中一些有關並行的類,包括並行工具,並行集合,鎖等。
AQS(抽象佇列同步器)是JUC下的一個基礎類,大多數的並行工具都是基於AQS實現的。
AQS本質並沒有實現太多的業務功能,只是對外提供了三點核心內容,來幫助實現其他的並行內容。
三點核心內容:
ReentrantLock就是基於AQS實現的。ReentrantLock類中維護這個一個內部抽象類Sync,他繼承了AQS類。ReentrantLock的lock和unlock方法就是呼叫的Sync的方法。
AQS流程(簡述)
1. 當new了一個ReentrantLock時,AQS預設state值為0, head 和 tail 都為null;
2. A執行緒執行lock方法,獲取鎖資源。
3. A執行緒將state通過cas操作從0改為1,代表獲取鎖資源成功。
4. B執行緒要獲取鎖資源時,鎖資源被A執行緒持有。
5. B執行緒獲取鎖資源失敗,需要新增到雙向連結串列中排隊。
6. 掛起B執行緒,等待A執行緒釋放鎖資源,再喚醒掛起的B執行緒。
7. A執行緒釋放鎖資源,將state從1改為0,再喚醒head.next節點。
8. B執行緒就可以重新嘗試獲取鎖資源。
注: 修改AQS雙向連結串列時要保證一個私有屬性變化和兩個共有屬性變化,只需要讓tail變化保證原子性即可。不能先改tail(會破壞雙向連結串列)
ReentrantLock中的lock方法實際是執行的Sync的lock方法。
Sync是一個抽象類,繼承了AQS
Sync有兩個子類實現:
Sync的lock方法實現:
// 非公平鎖 final void lock() { // CAS操作,嘗試將state從0改為1 // 成功就拿到鎖資源, 失敗執行acquire方法 if (compareAndSetState(0, 1)) // 成功就設定互斥鎖的為當前執行緒擁有 setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } // 公平鎖 final void lock() { acquire(1); }
如果CAS操作沒有成功,需要執行acquire方法走後續
acquire方法是AQS提供的,公平和非公平都是走的這個方法
public final void acquire(int arg) { // 1. tryAcquire方法: 再次嘗試拿鎖 // 2. addWaiter方法: 沒有獲取到鎖資源,去排隊 // 3. acquireQueued方法:掛起執行緒和後續被喚醒繼續獲取鎖資源的邏輯 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 如果這個過程中出現中斷,在整個過程結束後再自我中斷 selfInterrupt(); }
在AQS中tryAcquire是沒有具體實現邏輯的,AQS直接在tryAcquire方法中丟擲異常
在公平鎖和非公平鎖中有自己的實現。
// 非公平鎖 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } // 非公平鎖再次嘗試拿鎖 (注:該方法屬於Sync類中) final boolean nonfairTryAcquire(int acquires) { // 獲取當前執行緒物件 final Thread current = Thread.currentThread(); // 獲取state狀態 int c = getState(); // state是不是沒有執行緒持有鎖資源,可以嘗試獲取鎖 if (c == 0) { // 再次CAS操作嘗試修改state狀態從0改為1 if (compareAndSetState(0, acquires)) { // 成功就設定互斥鎖的為當前執行緒擁有 setExclusiveOwnerThread(current); return true; } } // 鎖資源是否被當前執行緒所持有 (可重入鎖) else if (current == getExclusiveOwnerThread()) { // 持有鎖資源為當前, 則對state + 1 int nextc = c + acquires; // 健壯性判斷 if (nextc < 0) // overflow // 超過最大鎖重入次數會拋異常(機率很小,理論上存在) throw new Error("Maximum lock count exceeded"); // 設定state狀態,代表鎖重入成功 setState(nextc); return true; } return false; }
// 公平鎖 protected final boolean tryAcquire(int acquires) { // 獲取當前執行緒物件 final Thread current = Thread.currentThread(); // 獲取state狀態 int c = getState(); // state是不是沒有執行緒持有鎖資源 if (c == 0) { // 當前鎖資源沒有被其他執行緒持有 // hasQueuedPredecessors方法: 鎖資源沒有被持有,進入佇列排隊 // 排隊規則: // 1. 檢查佇列沒有執行緒排隊,搶鎖。 // 2. 檢查佇列有執行緒排隊,檢視當前執行緒是否排在第一位,如果是搶鎖,否則入佇列(注:該方法只是判斷,沒有真正入佇列) if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { // 再次CAS操作嘗試, 成功就設定互斥鎖的為當前執行緒擁有 setExclusiveOwnerThread(current); return true; } } // 鎖資源是否被當前執行緒所持有 (可重入鎖) else if (current == getExclusiveOwnerThread()) { // 持有鎖資源為當前, 則對state + 1 int nextc = c + acquires; // 健壯性判斷 if (nextc < 0) // 超過最大鎖重入次數會拋異常(機率很小,理論上存在) throw new Error("Maximum lock count exceeded"); // 設定state狀態,代表鎖重入成功 setState(nextc); return true; } return false; }
addWaiter方法,就是將當前執行緒封裝為Node物件,並且插入到AQS的雙向連結串列。
// 執行緒入佇列排隊 private Node addWaiter(Node mode) { // 將當前物件封裝為Node物件 // Node.EXCLUSIVE 表示互斥 Node.SHARED 表示共用 Node node = new Node(Thread.currentThread(), mode); // 獲取tail節點 Node pred = tail; // 判斷雙向連結串列佇列有沒有初始化 if (pred != null) { // 將當前執行緒封裝的Node節點prev屬性指向tail尾節點 node.prev = pred; // 通過CAS操作設定當前執行緒封裝的Node節點為尾節點 if (compareAndSetTail(pred, node)) { // 成功則將上一個尾節點的next屬性指向當前執行緒封裝的Node節點 pred.next = node; return node; } } // 沒有初始化head 和 tail 都等於null // enq方法: 插入雙向連結串列和初始化雙向連結串列 enq(node); // 完成節點插入 return node; } // 插入雙向連結串列和初始化雙向連結串列 private Node enq(final Node node) { // 死迴圈 for (;;) { // 獲取當前tail節點 Node t = tail; // 判斷尾節點是否初始 if (t == null) { // Must initialize // 通過CAS操作初始化初始化一個虛擬的Node節點,賦給head節點 if (compareAndSetHead(new Node())) tail = head; } else { // 完成當前執行緒Node節點加入AQS雙向連結串列的過程 // 當前執行緒封裝的Node的上一個prev屬性指向tail節點 // 流程: 1. prev(私有) ---> 2. tail(共有) ---> 3. next (共有) node.prev = t; // 通過CAS操作修改tail尾節點指向當前執行緒封裝的Node if (compareAndSetTail(t, node)) { // 將當前執行緒封裝的Node節點賦給上一個Node的下一個next屬性 t.next = node; return t; } } } }
acquireQueued方法主要就是執行緒掛起以及重新嘗試獲取鎖資源的地方
重新獲取鎖資源主要有兩種情況:
// 當前執行緒Node新增到AQS佇列後續操作 final boolean acquireQueued(final Node node, int arg) { // 標記,記錄拿鎖狀態 失敗 boolean failed = true; try { // 中斷狀態 boolean interrupted = false; // 死迴圈 for (;;) { // 獲取當前節點的上一個節點 prev final Node p = node.predecessor(); // 判斷當前節點是否是head,是則代表當前節點排在第一位 // 如果是第一位,執行tryAcquire方法嘗試拿鎖 if (p == head && tryAcquire(arg)) { // 都成功,代表拿到鎖資源 // 將當前執行緒Node設定為head節點,同時將Node的thread 和 prev屬性設定為null setHead(node); // 將上一個head的next屬性設定為null,等待GC回收 p.next = null; // help GC // 拿鎖狀態 成功 failed = false; // 返回中斷狀態 return interrupted; } // 沒有獲取到鎖 --- 嘗試掛起執行緒 // shouldParkAfterFailedAcquire方法: 掛起執行緒前的準備 // parkAndCheckInterrupt方法: 掛起當前執行緒 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 設定中斷執行緒狀態 interrupted = true; } } finally { // 取消節點 if (failed) cancelAcquire(node); } } // 檢查並更新無法獲取鎖節點的狀態 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 獲取上一個節點的ws狀態 /** * SIGNAL(-1) 表示當前節點釋放鎖的時候,需要喚醒下一個節點。或者說後繼節點在等待當前節點喚醒,後繼節點入隊時候,會將前驅節點更新給signal。 * CANCELLED(1) 表示當前節點已取消排程。當timeout或者中斷情況下,會觸發變更為此狀態,進入該狀態後的節點不再變化。 * CONDITION(-2) 當其他執行緒呼叫了condition的signal方法後,condition狀態的節點會從等待佇列轉移到同步佇列中,等待獲取同步鎖。 * PROPAGATE(-3) 表示共用模式下,前驅節點不僅會喚醒其後繼節點,同時也可能喚醒後繼的後繼節點。 * 預設(0) 新節點入隊時候的預設狀態。 */ int ws = pred.waitStatus; // 判斷上個節點ws狀態是否是 -1, 是則掛起 if (ws == Node.SIGNAL) return true; if (ws > 0) { /** * 判斷上個節點是否是取消或者其他狀態。 * 向前找到不是取消狀態的節點,修改ws狀態。 * 注意:那些放棄的結點,由於被自己「加塞」到它們前邊,它們相當於形成一個無參照鏈, * 稍後就會被GC回收,這個操作實際是把佇列中的cancelled節點剔除掉。 */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 如果前驅節點正常,那就把上一個節點的狀態通過CAS的方式設定成-1 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } // 掛起當前執行緒 private final boolean parkAndCheckInterrupt() { // 掛起當前執行緒 LockSupport.park(this); // 返回中斷標誌 return Thread.interrupted(); }
// 互斥鎖模式 解鎖 public final boolean release(int arg) { // 嘗試是否可以解鎖 if (tryRelease(arg)) { Node h = head; // 判斷雙連結串列是否存線上程排隊 if (h != null && h.waitStatus != 0) // 喚醒後續執行緒 unparkSuccessor(h); return true; } return false; } // 嘗試是否可以解鎖 protected final boolean tryRelease(int releases) { // 鎖狀態 = 狀態 - 1 int c = getState() - releases; // 判斷鎖是是否是當前執行緒持有 if (Thread.currentThread() != getExclusiveOwnerThread()) // 當前執行緒沒有持有丟擲異常 throw new IllegalMonitorStateException(); boolean free = false; // 當前鎖狀態變為0,則清空鎖歸屬執行緒 if (c == 0) { free = true; setExclusiveOwnerThread(null); } // 設定鎖狀態為0 setState(c); return free; } // 喚醒執行緒 private void unparkSuccessor(Node node) { // 獲取頭節點的狀態 int ws = node.waitStatus; if (ws < 0) // 通過CAS將頭節點的狀態設定為初始狀態 compareAndSetWaitStatus(node, ws, 0); // 後繼節點 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; // 從尾節點開始往前遍歷,尋找離頭節點最近的等待狀態正常的節點 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 真正的喚醒操作 LockSupport.unpark(s.thread); }
以上僅供參考!!