Java中AQS的原理與實現

2023-07-09 06:00:38

目錄

1:什麼是AQS?

2:AQS都有那些用途?

3:我們如何使用AQS

4:AQS的實現原理

5:對AQS的設計與實現的一些思考

1:什麼是AQS

​ 隨著計算機的算力越來越強大,各種各樣的並行程式設計模型也隨即踴躍而來,但當我們要在平行計算中使用共用資源的時候,就需要利用一種手段控制對共用資源的存取和修改來保證我們程式的正確的執行。而Java中除了在語言級別實現的synchronized鎖之外,還有另一種對共用資源存取控制的實現,而這些實現都依賴於一個叫做抽象佇列同步器(AbstractQueuedSynchronizer)的模板框架類,簡稱AQS。所以我們想要更深刻的理解Java中對共用資源的存取控制,就不可避免的要對AQS深入的瞭解和探討。

​ 我們首先看一下官方對於AQS的描述:提供一個依賴先進先出(FIFO)等待佇列的掛起鎖和相關同步器(號誌、事件等)框架。此類被設計為大多數型別的同步器的基礎類別,這些同步器依賴於單個原子int值來表示狀態。子類必須定義更改該狀態的受保護方法,以及定義該狀態在獲取或釋放該物件方面的含義。考慮到這些,類中的其他方法實現排隊和掛起機制。子類可以維護狀態列位,但只有使用方法getState、setState和compareAndSetState方法才可以更改狀態。

2:AQS有哪些用途

獲取鎖的入口方法

  public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            //acquireQueued方法會返回執行緒在掛起過程中是否被中斷,然後返回執行緒中斷的狀態
            selfInterrupt();
    }

可以看到獲取鎖的方法是一個由final修飾的不可被子類重寫的方法,首先呼叫了上面的模板方法(必須由子類重寫獲取邏輯)獲取鎖

如果獲取成功則獲取鎖流程執行結束。否則執行addWaiter方法執行入隊邏輯。

執行緒入隊方法

    private Node addWaiter(Node mode) {
        //mode執行緒獲取鎖的模式(獨佔或者共用)
        Node node = new Node(Thread.currentThread(), mode);
        //嘗試快速入隊,失敗則執行完整入隊邏輯
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            //如果尾節點不為null,則以原子方式把當前節點設定為尾節點並返回
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //如果尾節點不存(說明頭節點和尾節點未初始化)在或者由於競爭導致一次設定尾節點失敗,
        //則執行完整入隊邏輯(會進行頭節點和尾節點初始化的工作)
        enq(node);
        return node;
    }

addWaiter方法會先進行快速入隊操作,如果快速入隊失敗(由於競爭或者頭、尾節點未初始化),則進行完整入隊操作(如果頭、尾節點未初始化的話會先進行初始化操作)

完整入隊邏輯

    private Node enq(final Node node) {
        //自旋把當前節點連結到尾節點之後
        for (;;) {
            Node t = tail;
            if (t == null) {
                //尾節為空,說明佇列為空,則需要進行頭節點和尾節點的初始化
                //這裡直接new Node(),一個虛節點作為頭節點,然後將頭節點和尾節點相同
                //這裡說明頭節點和尾節點不儲存資料
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                //尾節點不為空,使用cas把當前節點設定為尾節點
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

enq方法會利用自旋先檢查頭節點和尾節點是否初始化,如果未初始化的話則先進行初始化。初始化完成之後以原子的方式插入node到隊尾並且插入成功之後返回此節點。

掛起執行緒並等待獲取鎖

final boolean acquireQueued(final Node node, int arg) {
    	//是否失敗,此執行緒被中斷可能失敗
        boolean failed = true;
        try {
            //執行緒是否被中斷
            boolean interrupted = false;
            //自旋一直獲取鎖
            for (;;) {
                //獲取當前節點的前驅節點
                final Node p = node.predecessor();
                //如果當前節點的前驅節點是頭節點(因為頭節點是虛節點,所以當前節點可以獲取鎖),
                //且當前節點獲取所成功
                if (p == head && tryAcquire(arg)) {
                    //設定node為頭節點,因為當前節點已經獲取鎖成功了,當前節點需要作為頭節點
                    setHead(node);
                    p.next = null; // help GC
                    //設定失敗標誌為false
                    failed = false;
                    //返回中斷狀態
                    return interrupted;
                }
                //shouldParkAfterFailedAcquire方法檢查並更新獲取失敗的節點的狀態,如果執行緒應該掛起則返回true
                //parkAndCheckInterrupt則掛起執行緒並返回是否中斷
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            //失敗,則取消獲取鎖
            if (failed)
                cancelAcquire(node);
        }
    }

​ 通過上面流程分析可知,獲取鎖失敗,會呼叫addWaiter方法把當前節點放到隊尾,那麼執行緒入隊之後什麼時候掛起執行緒,什麼時候出隊,我們一點一點分析acquireQueued方法這些問題就會逐漸顯露出來。

​ 首先該方法會一直自旋獲取鎖(中間可能會被掛起,防止無效自旋),判斷當前節點的前驅節點是否是頭節點來判斷當前是否有獲取鎖的資格,如果有的話則設定當前節點為頭節點並返回中斷狀態。否則呼叫shouldParkAfterFailedAcquire判斷獲取鎖失敗後是否可以掛起,可以的話則呼叫parkAndCheckInterrupt進行執行緒掛起操作。

設定頭節點

    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

注:setHead方法是把當前節點置為虛節點,但並沒有修改waitStatus,因為其他地方要用到。

檢查並更新獲取失敗的節點的狀態

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * node的狀態已經是SIGNAL可以安全的掛起,直接返回true 
             */
            return true;
        if (ws > 0) {
            /*
             *	由之前的waitStatus變數列舉值可知,waitStatus大於0為取消狀態,直接跳過此節點
             */
            do {
                //重新連結prev指標,至於為什麼沒有操作next指標後面會通過程式碼解釋
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * 原子方式設定waitStatus的值為SIGNAL
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

掛起並檢查執行緒的中斷狀態

    private final boolean parkAndCheckInterrupt() {
    	//使用LockSupport掛起此執行緒
        LockSupport.park(this);
        //返回並清除中斷狀態
        return Thread.interrupted();
    }

取消獲取鎖

private void cancelAcquire(Node node) {
        // 忽略不存在的節點
        if (node == null)
            return;
		//設定當前節點不持有執行緒
        node.thread = null;

        // 跳過取消的前驅節點
        Node pred = node.prev;
    	//waitStatus>0未已取消的節點
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // 未取消的節點的後繼節點
        Node predNext = pred.next;

        //設定狀態未取消狀態
        node.waitStatus = Node.CANCELLED;

        // 如果node為尾節點,設定pred為尾節點,然後設定尾節點的下一個節點為null
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            int ws;
        	// 如果當前節點不是head的後繼節點,
            //1:判斷當前節點前驅節點的是否為SIGNAL,
            //2:如果不是,則把前驅節點設定為SINGAL看是否成功
    		// 如果1和2中有一個為true,再判斷當前節點的執行緒是否為null
   	 		// 如果上述條件都滿足,把當前節點的前驅節點的後繼指標指向當前節點的後繼節點
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    //為什麼沒有自旋,如果此處設定失敗,下次仍然會丟掉predNext到next中間節點,所以不會出現問題
                    compareAndSetNext(pred, predNext, next);
            } else {
                //當前節點是頭節點的後繼節點或者上述條件都不滿足
                unparkSuccessor(node);
            }
			//為什麼此處能help GC,不得不多說Doug Lea大神心思之縝密,考慮之周全。
            //解釋參考http://www.codebaoku.com/question/question-sd-1010000043795300.html
            node.next = node; // help GC
        }
    }

當node==tail時,節點變化情況如下圖

當pred==head時,節點的變化情況如下圖

4.4:釋放鎖原始碼及流程分析

釋放鎖流程圖如下:

    public final boolean release(int arg) {
        //嘗試釋放鎖成功
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        //嘗試釋放鎖失敗
        return false;
    }

釋放鎖的流程就比較簡單了,先嚐試釋放鎖,如果釋放鎖成功,(如果頭節點不為null且頭節點的狀態不等於0則釋放頭節點的後繼節點)返回true,否則返回false。

5:對AQS的設計與實現的一些思考

​ 1:設計方面,AQS作為底層一個通用的模板框架類,就要考慮到一些易用性和擴充套件性,比如作者模板方法使用的丟擲異常,而不是作為抽象方法強制使用者實現所有的模板方法,而是使用者可以自由選擇要使用的方法和特性選擇性實現模板方法,當然也犧牲掉了一些其他東西,比如設計原則的最小職責性。這也就體現了一些折衷的思想,任何方案都不是完美的,我們只有權衡利弊之後達到一個相對完美的方案。

​ 2:實現方面,AQS的實現不得不令人驚歎,每一次讀都會想到書讀百遍,其意自現這句名言,每次讀有不一樣的收穫,看似一行不經意的程式碼,體現了作者對每一行程式碼細緻又獨到的思考。在讀AQS程式碼的時候參考下面的連結,看大牛對AQS的的理解時見解,不僅加深了我對AQS核心思想的理解,也讓我從另一方面看到了AQS的優秀之處(由於個人水平有限,理解不到位或者錯誤還請各位道友不吝賜教)。路漫漫其修遠兮,吾將上下而求索。

參考:

https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html

//help GC 相關的解釋
http://www.codebaoku.com/question/question-sd-1010000043795300.html