jdk執行緒池ThreadPoolExecutor優雅停止原理解析(自己動手實現執行緒池)(二)

2022-11-18 06:00:50

jdk執行緒池工作原理解析(二)

本篇部落格是jdk執行緒池ThreadPoolExecutor工作原理解析系列部落格的第二篇,在第一篇部落格中從原始碼層面分析了ThreadPoolExecutor在RUNNING狀態下處理任務的核心邏輯,而在這篇部落格中將會詳細講解jdk執行緒池ThreadPoolExecutor優雅停止的實現原理。

ThreadPoolExecutor優雅停止原始碼分析(自己動手實現執行緒池v2版本)

ThreadPoolExecutor為了實現優雅停止功能,為執行緒池設定了一個狀態屬性,其共有5種情況。
在第一篇部落格中曾介紹過,AtomicInteger型別的變數ctl同時維護了兩個業務屬性當前活躍工作執行緒個數與執行緒池狀態,其中ctl的高3位用於存放執行緒池狀態。

執行緒池工作狀態介紹

執行緒池工作狀態是單調推進的,即從執行時->停止中->完全停止。共有以下五種情況

1. RUNNING

RUNNING狀態,代表著執行緒池處於正常執行(執行時)。RUNNING狀態的執行緒池能正常的接收並處理提交的任務
ThreadPoolExecutor初始化時對ctl賦予的預設屬性便是RUNNING(private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));)
RUNNING狀態下執行緒池正常工作的原理已經在第一篇部落格中詳細的介紹過了,這裡不再贅述。

2. SHUTDOWN

SHUTDOWN狀態,代表執行緒池處於停止對外服務的狀態(停止中)。不再接收新提交的任務,但依然會將workQueue工作佇列中積壓的任務逐步處理完。
使用者可以通過呼叫shutdown方法令執行緒池由RUNNING狀態進入SHUTDOWN狀態,shutdown方法會在下文詳細展開分析。

3. STOP

STOP狀態,代表執行緒池處於停止狀態。不再接受新提交的任務(停止中),同時也不再處理workQueue工作佇列中積壓的任務,當前還在處理任務的工作執行緒將收到interrupt中斷通知
使用者可以通過呼叫shutdownNow方法令執行緒池由RUNNING或者SHUTDOWN狀態進入STOP狀態,shutdownNow方法會在下文詳細展開分析。

4. TIDYING

TIDYING狀態,代表著執行緒池即將完全終止,正在做最後的收尾工作(停止中)。
線上程池中所有的工作執行緒都已經完全退出,且工作佇列中的任務已經被清空時會由SHUTDOWN或STOP狀態進入TIDYING狀態。

5. TERMINATED

TERMINATED狀態,代表著執行緒池完全的關閉(完全停止)。

public class MyThreadPoolExecutorV2 implements MyThreadPoolExecutor {
    /**
     * 當前執行緒池中存在的worker執行緒數量 + 狀態的一個聚合(通過一個原子int進行cas,來避免對兩個業務屬性欄位加鎖來保證一致性)
     */
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;

    /**
     * 32位元的有符號整數,有3位是用來存放執行緒池狀態的,所以用來維護當前工作執行緒個數的部分就只能用29位了
     * 被佔去的3位中,有1位原來的符號位,2位是原來的數值位
     * */
    private static final int CAPACITY = (1 << COUNT_BITS) - 1;

    /**
     * 執行緒池狀態poolStatus常數(狀態值只會由小到大,單調遞增)
     * 執行緒池狀態遷移圖:
     *         ↗ SHUTDOWN ↘
     * RUNNING       ↓       TIDYING → TERMINATED
     *         ↘   STOP   ↗
     * 1 RUNNING狀態,代表著執行緒池處於正常執行的狀態。能正常的接收並處理提交的任務
     * 執行緒池物件初始化時,狀態為RUNNING
     * 對應邏輯:private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
     *
     * 2 SHUTDOWN狀態,代表執行緒池處於停止對外服務的狀態。不再接收新提交的任務,但依然會將workQueue工作佇列中積壓的任務處理完
     * 呼叫了shutdown方法時,狀態由RUNNING -> SHUTDOWN
     * 對應邏輯:shutdown方法中的advanceRunState(SHUTDOWN);
     *
     * 3 STOP狀態,代表執行緒池處於停止狀態。不再接受新提交的任務,同時也不再處理workQueue工作佇列中積壓的任務,當前還在處理任務的工作執行緒將收到interrupt中斷通知
     * 之前未呼叫shutdown方法,直接呼叫了shutdownNow方法,狀態由RUNNING -> STOP
     * 之前先呼叫了shutdown方法,後呼叫了shutdownNow方法,狀態由SHUTDOWN -> STOP
     * 對應邏輯:shutdownNow方法中的advanceRunState(STOP);
     *
     * 4 TIDYING狀態,代表著執行緒池即將完全終止,正在做最後的收尾工作
     * 當前執行緒池狀態為SHUTDOWN,任務被消費完工作佇列workQueue為空,且工作執行緒全部退出完成工作執行緒集合workers為空時,tryTerminate方法中將狀態由SHUTDOWN->TIDYING
     * 當前執行緒池狀態為STOP,工作執行緒全部退出完成工作執行緒集合workers為空時,tryTerminate方法中將狀態由STOP->TIDYING
     * 對應邏輯:tryTerminate方法中的ctl.compareAndSet(c, ctlOf(TIDYING, 0)
     *
     * 5 TERMINATED狀態,代表著執行緒池完全的關閉。之前執行緒池已經處於TIDYING狀態,且呼叫的勾點函數terminated已返回
     * 當前執行緒池狀態為TIDYING,呼叫的勾點函數terminated已返回
     * 對應邏輯:tryTerminate方法中的ctl.set(ctlOf(TERMINATED, 0));
     * */

    //  11100000 00000000 00000000 00000000
    private static final int RUNNING = -1 << COUNT_BITS;
    //  00000000 00000000 00000000 00000000
    private static final int SHUTDOWN = 0 << COUNT_BITS;
    //  00100000 00000000 00000000 00000000
    private static final int STOP = 1 << COUNT_BITS;
    //  01000000 00000000 00000000 00000000
    private static final int TIDYING = 2 << COUNT_BITS;
    //  01100000 00000000 00000000 00000000
    private static final int TERMINATED = 3 << COUNT_BITS;

    private static int runStateOf(int c) {
        return c & ~CAPACITY;
    }
    
    private static int ctlOf(int rs, int wc) {
        return rs | wc;
    }
    
    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }

    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

    /**
     * 推進執行緒池工作狀態
     * */
    private void advanceRunState(int targetState) {
        for(;;){
            // 獲得當前的執行緒池狀態
            int currentCtl = this.ctl.get();

            // 1 (runState >= targetState)如果當前執行緒池狀態不比傳入的targetState小
            // 代表當前狀態已經比引數要制定的更加快(或者至少已經處於對應階段了),則無需更新poolStatus的狀態(或語句中第一個條件為false,直接break了)
            // 2  (this.ctl.compareAndSet),cas的將runState更新為targetState
            // 如果返回true則說明cas更新成功直接break結束(或語句中第一個條件為false,第二個條件為true)
            // 如果返回false說明cas爭搶失敗,再次進入while迴圈重試(或語句中第一個和第二個條件都是false,不break而是繼續執行迴圈重試)
            if (runStateAtLeast(currentCtl, targetState) ||
                    this.ctl.compareAndSet(
                            currentCtl,
                            ctlOf(targetState, workerCountOf(currentCtl)
                            ))) {
                break;
            }
        }
    }
}    
  • 因為執行緒池狀態不是單獨存放,而是放在ctl這一32位元資料的高3位的,讀寫都比較麻煩,因此提供了runStateOf和ctlOf等輔助方法(位運算)來簡化操作。
  • 執行緒池的狀態是單調遞進的,由於巧妙的將狀態靠前的值設定的更小,因此通過直接比較狀態的值來判斷當前執行緒池狀態是否推進到了指定的狀態(runStateLessThan、runStateAtLeast、isRunning、advanceRunState)。

jdk執行緒池ThreadPoolExecutor優雅停止具體實現原理

執行緒池的優雅停止一般要能做到以下幾點:

  1. 執行緒池在中止後不能再受理新的任務
  2. 執行緒池中止的過程中,已經提交的現存任務不能丟失(等待剩餘任務執行完再關閉或者能夠把剩餘的任務吐出來還給使用者)
  3. 執行緒池最終關閉前,確保建立的所有工作執行緒都已退出,不會出現資源的洩露

下面我們從原始碼層面解析ThreadPoolExecutor,看看其是如何實現上述這三點的.

如何中止執行緒池

ThreadPoolExecutor執行緒池提供了shutdown和shutdownNow這兩個public方法給使用者用於發出執行緒池的停止指令。

shutdown方法

shutdown方法用於關閉執行緒池,並令執行緒池從RUNNING狀態轉變位SHUTDOWN狀態。位於SHUTDOWN狀態的執行緒池,不再接收新任務,但已提交的任務會全部被執行完。

    /**
     * 關閉執行緒池(不再接收新任務,但已提交的任務會全部被執行)
     * 但不會等待任務徹底的執行完成(awaitTermination)
     */
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;

        // shutdown操作中涉及大量的資源存取和更新,直接通過互斥鎖防並行
        mainLock.lock();
        try {
            // 用於shutdown/shutdownNow時的安全存取許可權
            checkShutdownAccess();
            // 將執行緒池狀態從RUNNING推進到SHUTDOWN
            advanceRunState(SHUTDOWN);
            // shutdown不會立即停止所有執行緒,而僅僅先中斷idle狀態的多餘執行緒進行回收,還在執行任務的執行緒就慢慢等其執行完
            interruptIdleWorkers();
            // 單獨為ScheduledThreadPoolExecutor開的一個勾點函數(hook for ScheduledThreadPoolExecutor)
            onShutdown();
        } finally {
            mainLock.unlock();
        }

        // 嘗試終止執行緒池
        tryTerminate();
    }

    /**
     * 用於shutdown/shutdownNow時的安全存取許可權
     * 檢查當前呼叫者是否有許可權去通過interrupt方法去中斷對應工作執行緒
     * */
    private void checkShutdownAccess() {
        // 判斷jvm啟動時是否設定了安全管理器SecurityManager
        SecurityManager security = System.getSecurityManager();
        // 如果沒有設定,直接返回無事發生

        if (security != null) {
            // 設定了許可權管理器,驗證當前呼叫者是否有modifyThread的許可權
            // 如果沒有,checkPermission會丟擲SecurityException異常
            security.checkPermission(shutdownPerm);
    
            // 通過上述校驗,檢查工作執行緒是否能夠被呼叫者存取
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (MyWorker w : workers) {
                    // 檢查每一個工作執行緒中的thread物件是否有許可權被呼叫者存取
                    security.checkAccess(w.thread);
                }
            } finally {
                mainLock.unlock();
            }
        }
    }

    /**
     * 中斷所有處於idle狀態的執行緒
     * */
    private void interruptIdleWorkers() {
        // 預設打斷所有idle狀態的工作執行緒
        interruptIdleWorkers(false);
    }

    private static final boolean ONLY_ONE = true;

    /**
     * 中斷處於idle狀態的執行緒
     * @param onlyOne 如果為ture,至多隻中斷一個工作執行緒(可能一個都不中斷)
     *                如果為false,中斷workers內註冊的所有工作執行緒
     * */
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (MyWorker w : workers) {
                Thread t = w.thread;
                // 1. t.isInterrupted(),說明當前執行緒存在中斷訊號,之前已經被中斷了,無需再次中斷
                // 2. w.tryLock(), runWorker方法中如果工作執行緒獲取到任務開始工作,會先進行Lock加鎖
                // 則這裡的tryLock會加鎖失敗,返回false。 而返回true的話,就說明當前工作執行緒是一個idle執行緒,需要被中斷
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        // tryLock成功時,會將內部state的值設定為1,通過unlock恢復到未加鎖的狀態
                        w.unlock();
                    }
                }
                if (onlyOne) {
                    // 引數onlyOne為true,至多隻中斷一個工作執行緒
                    // 即使上面的t.interrupt()沒有執行,也在這裡跳出迴圈
                    break;
                }
            }
        } finally {
            mainLock.unlock();
        }
    }
    
    /**
     * 單獨為jdk的ScheduledThreadPoolExecutor開的一個勾點函數
     * 由ScheduledThreadPoolExecutor繼承ThreadExecutor時重寫(包級別存取許可權)
     * */
    void onShutdown() {}
  1. shutdown方法在入口處使用mainLock加鎖後,通過checkShutdownAccess檢查當前是否有許可權存取工作執行緒(前提是設定了SecurityManager),如果無許可權則會丟擲SecurityException異常。
  2. 通過advanceRunState方法將執行緒池狀態推進到SHUTDOWN。
  3. 通過interruptIdleWorkers使用中斷指令(Thread.interrupt)喚醒所有處於idle狀態的工作執行緒(存在idle狀態的工作執行緒代表著當前工作佇列是空的)。
    idle的工作執行緒在被喚醒後從getTask方法中退出(getTask中對應的退出邏輯在下文中展開),進而退出runWorker方法,最終系統回收掉工作執行緒佔用的各種資源(第一篇部落格中runWorker的解析中提到過)。
  4. 呼叫包級別修飾的勾點函數onShutdown。這一方法是作者專門為同為java.util.concurrent包下的ScheduledThreadPoolExecutor提供的拓展,不在本篇部落格中展開。
  5. 前面提到SHUTDOWN狀態的執行緒池在工作執行緒都全部退出且工作佇列為空時會轉變為TIDYING狀態,因此通過呼叫tryTerminate方法嘗試終止執行緒池(當前不一定會滿足條件,比如呼叫了shutdown但工作佇列還有很多工等待執行)。
    tryTerminate方法中細節比較多,下文中再展開分析。
shutdownNow方法

shutdownNow方法同樣用於關閉執行緒池,但比shutdown方法更加激進。shutdownNow方法令執行緒池從RUNNING狀態轉變為STOP狀態,不再接收新任務,而工作佇列中未完成的任務會以列表的形式返回給shutdownNow的呼叫者。

  • shutdown方法在呼叫後,雖然不再接受新任務,但會等待工作佇列中的佇列被慢慢消費掉;而shutdownNow並不會等待,而是將當前工作佇列中的所有未被撈取執行的剩餘任務全部返回給shutdownNow的呼叫者,並對所有的工作執行緒(包括非idle的執行緒)發出中斷通知。
  • 這樣做的好處是執行緒池可以更快的進入終止態,而不必等剩餘的任務都完成,都返回給使用者後也不會丟任務。
    /**
     * 立即關閉執行緒池(不再接收新任務,工作佇列中未完成的任務會以列表的形式返回)
     * @return 當前工作佇列中未完成的任務
     * */
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;

        final ReentrantLock mainLock = this.mainLock;

        // shutdown操作中涉及大量的資源存取和更新,直接通過互斥鎖防並行
        mainLock.lock();
        try {
            // 用於shutdown/shutdownNow時的安全存取許可權
            checkShutdownAccess();
            // 將執行緒池狀態從RUNNING推進到STOP
            advanceRunState(STOP);
            interruptWorkers();

            // 將工作佇列中未完成的任務提取出來(會清空執行緒池的workQueue)
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }

        // 嘗試終止執行緒池
        tryTerminate();
        return tasks;
    }

   /**
    * shutdownNow方法內,立即終止執行緒池時該方法被呼叫
    * 中斷通知所有已經啟動的工作執行緒(比如等待在工作佇列上的idle工作執行緒,或者run方法內部await、sleep等,令其丟擲中斷異常快速結束)
    * */
    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (MyWorker w : workers) {
              // 遍歷所有的worker執行緒,已啟動的工作執行緒全部呼叫Thread.interrupt方法,發出中斷訊號
              w.interruptIfStarted();
            }
        } finally {
            mainLock.unlock();
        }
    }

   /**
    * 將工作佇列中的任務全部轉移出來
    * 用於shutdownNow緊急關閉執行緒池時將未完成的任務返回給呼叫者,避免任務丟失
    * */
   private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> queue = this.workQueue;
        ArrayList<Runnable> taskList = new ArrayList<>();
        queue.drainTo(taskList);
        // 通常情況下,普通的阻塞佇列的drainTo方法可以一次性的把所有元素都轉移到taskList中
        // 但jdk的DelayedQueue或者一些自定義的阻塞佇列,drainTo方法無法轉移所有的元素
        // (比如DelayedQueue的drainTo方法只能轉移已經不需要延遲的元素,即getDelay()<=0)
        if (!queue.isEmpty()) {
           // 所以在這裡打一個修補程式邏輯:如果drainTo方法執行後工作佇列依然不為空,則通過更基礎的remove方法把佇列中剩餘元素一個一個的迴圈放到taskList中
           for (Runnable r : queue.toArray(new Runnable[0])) {
              if (queue.remove(r)) {
                taskList.add(r);
              }
           }
        }
        
        return taskList;
   }    
  1. shutdownNow方法在入口處使用mainLock加鎖後,與shutdown方法一樣也通過checkShutdownAccess檢查當前是否有許可權存取工作執行緒(前提是設定了SecurityManager),如果無許可權則會丟擲SecurityException異常。
  2. 通過advanceRunState方法將執行緒池狀態推進到STOP。
  3. 通過interruptWorkers使用中斷指令(Thread.interrupt)喚醒所有工作執行緒(區別於shutdown中的interruptIdleWorkers)。區別在於除了idle的工作執行緒,所有正在執行任務的工作執行緒也會收到中斷通知,期望其能儘快退出任務的執行。
  4. 通過drainQueue方法將當前工作執行緒中剩餘的所有任務以List的形式統一返回給呼叫者。
  5. 通過呼叫tryTerminate方法嘗試終止執行緒池。

如何保證執行緒池在中止後不能再受理新的任務?

在execute方法作為入口,提交任務的邏輯中,v2版本相比v1版本新增了一些基於執行緒池狀態的校驗(和jdk的實現保持一致了)。

execute方法中的校驗
  • 首先在execute方法中,向工作佇列加入新任務前(workQueue.offer)對當前執行緒池的狀態做了一個校驗(isRunning(currentCtl))。希望非RUNNING狀態的執行緒池不向工作佇列中新增新任務
    但在做該檢查時可能與shutdown/shutdownNow內推進執行緒池狀態的邏輯並行執行,所以在工作佇列成功加入任務後還需要再檢查一次執行緒池狀態,如果此時已經不是RUNNING狀態則需要通過remove方法將剛入隊的任務從佇列中移除,並呼叫reject方法(拒絕策略)
addWorker方法中的校驗
  • 在addWorker方法的入口處(retry:第一層迴圈通過(runState >= SHUTDOWN && !(runState == SHUTDOWN && firstTask == null && !workQueue.isEmpty())))邏輯,
    保證了不是RUNNING狀態的執行緒池(runState >= SHUTDOWN),無法建立新的工作執行緒(addWorker返回false)。
    但有一種特殊情況:即SHUTDOWN狀態下(runState == SHUTDOWN),工作佇列不為空(!workQueue.isEmpty()),且不是第一次提交任務時建立新工作執行緒(firstTask == null),
    依然允許建立新的工作執行緒,因為即使在SHUTDOWN狀態下,某一存活的工作執行緒發生中斷異常時,會呼叫processWorkerExit方法,在銷燬原有工作執行緒後依然需要呼叫addWorker重新建立一個新的(firstTask == null)
execute與shutdown/shutdownNow並行時的處理

execute提交任務時addWorker方法和shutdown/shutdownNow方法是可能並行執行的,但addWorker中有多處地方都對執行緒池的狀態進行了檢查,盡最大的可能避免執行緒池停止時繼續建立新的工作執行緒。

  1. retry迴圈中,compareAndIncrementWorkerCount方法會cas的更新狀態(此前獲取到的ctl狀態必然是RUNNING,否則走不到這裡),cas成功則會跳出retry:迴圈( break retry;)。
    而cas失敗可能有兩種情況:
    如果是workerCount發生了並行的變化,則在內層的for (;;)迴圈中進行重試即可
    如果執行緒池由於收到終止指令而推進了狀態,則隨後的if (runStateOf(currentCtl) != runState)將會為true,跳出到外層的迴圈重試(continue retry)
  2. 在new Worker(firstTask)後,使用mainLock獲取鎖後再一次檢查執行緒池狀態(if (runState < SHUTDOWN ||(runState == SHUTDOWN && firstTask == null)))。
    由於shutdown、shutdownNow也是通過mainLock加鎖後才推進的執行緒池狀態,因此這裡獲取到的狀態是準確的。
    如果校驗失敗(if結果為false),則workers中不會加入新建立的工作執行緒,臨時變數workerAdded=false,則工作執行緒不會啟動(t.start())。臨時變數workerStarted也為false,最後會呼叫addWorkerFailed將新建立的工作執行緒回收掉(回滾)

基於execute方法和addWorker方法中關於各項關於執行緒池停止狀態校驗,最大程度的避免了執行緒池在停止過程中新任務的提交和可能的新工作執行緒的建立。使得execute方法線上程池接收到停止指令後(>=SHUTDOWN),最終都會去執行reject拒絕策略邏輯。

/**
     * 提交任務,並執行
     * */
    @Override
    public void execute(Runnable command) {
        if (command == null){
            throw new NullPointerException("command引數不能為空");
        }

        int currentCtl = this.ctl.get();
        if (workerCountOf(currentCtl) < this.corePoolSize) {
            // 如果當前存在的worker執行緒數量低於指定的核心執行緒數量,則建立新的核心執行緒
            boolean addCoreWorkerSuccess = addWorker(command,true);
            if(addCoreWorkerSuccess){
                // addWorker新增成功,直接返回即可
                return;
            }

            // addWorker失敗了
            // 失敗的原因主要有以下幾個:
            // 1 執行緒池的狀態出現了變化,比如呼叫了shutdown/shutdownNow方法,不再是RUNNING狀態,停止接受新的任務
            // 2 多個執行緒並行的execute提交任務,導致cas失敗,重試後發現當前執行緒的個數已經超過了限制
            // 3 小概率是ThreadFactory執行緒工廠沒有正確的返回一個Thread

            // 獲取最新的ctl狀態
            currentCtl = this.ctl.get();
        }

        // 走到這裡有兩種情況
        // 1 因為核心執行緒超過限制(workerCountOf(currentCtl) < corePoolSize == false),需要嘗試嘗試將任務放入阻塞佇列
        // 2 addWorker返回false,建立核心工作執行緒失敗

        // 判斷當前執行緒池狀態是否為running
        // 如果是running狀態,則進一步執行任務入隊操作
        if(isRunning(currentCtl) && this.workQueue.offer(command)){
            // 執行緒池是running狀態,且workQueue.offer入隊成功

            int recheck = this.ctl.get();
            // 重新檢查狀態,避免在上面入隊的過程中執行緒池並行的關閉了
            // 如果是isRunning=false,則進一步需要通過remove操作將剛才入隊的任務刪除,進行回滾
            if (!isRunning(recheck) && remove(command)) {
                // 執行緒池關閉了,執行reject操作
                reject(command);
            } else if(workerCountOf(currentCtl) == 0){
                // 在corePoolSize為0的情況下,當前不存在存活的核心執行緒
                // 一個任務在入隊之後,如果當前執行緒池中一個執行緒都沒有,則需要兜底的建立一個非核心執行緒來處理入隊的任務
                // 因此firstTask為null,目的是先讓任務先入隊後建立執行緒去拉取任務並執行
                addWorker(null,false);
            }else{
                // 加入佇列成功,且當前存在worker執行緒,成功返回
                return;
            }
        }else{
            // 阻塞佇列已滿,嘗試建立一個新的非核心執行緒處理
            boolean addNonCoreWorkerSuccess = addWorker(command,false);
            if(!addNonCoreWorkerSuccess){
                // 建立非核心執行緒失敗,執行拒絕策略(失敗的原因和前面建立核心執行緒addWorker的原因類似)
                reject(command);
            }else{
                // 建立非核心執行緒成功,成功返回
                return;
            }
        }
    }
/**
     * 向執行緒池中加入worker
     * */
    private boolean addWorker(Runnable firstTask, boolean core) {
        // retry標識外層迴圈
        retry:
        for (;;) {
            int currentCtl = ctl.get();
            int runState = runStateOf(currentCtl);

            // Check if queue empty only if necessary.
            // 執行緒池終止時需要返回false,避免新的worker被建立
            // 1 先判斷runState >= SHUTDOWN
            // 2 runState >= SHUTDOWN時,意味著不再允許建立新的工作執行緒,但有一種情況例外
            // 即SHUTDOWN狀態下(runState == SHUTDOWN),工作佇列不為空(!workQueue.isEmpty()),還需要繼續執行
            // 比如在當前存活的執行緒發生中斷異常時,會呼叫processWorkerExit方法,在銷燬原有工作執行緒後呼叫addWorker重新建立一個新的(firstTask == null)
            if (runState >= SHUTDOWN && !(runState == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) {
                // 執行緒池已經是關閉狀態了,不再允許建立新的工作執行緒,返回false
                return false;
            }

            // 用於cas更新workerCount的內層迴圈(注意這裡面與jdk的寫法不同,改寫成了邏輯一致但更可讀的形式)
            for (;;) {
                // 判斷當前worker數量是否超過了限制
                int workerCount = workerCountOf(currentCtl);
                if (workerCount >= CAPACITY) {
                    // 當前worker數量超過了設計上允許的最大限制
                    return false;
                }
                if (core) {
                    // 建立的是核心執行緒,判斷當前執行緒數是否已經超過了指定的核心執行緒數
                    if (workerCount >= this.corePoolSize) {
                        // 超過了核心執行緒數,建立核心worker執行緒失敗
                        return false;
                    }
                } else {
                    // 建立的是非核心執行緒,判斷當前執行緒數是否已經超過了指定的最大執行緒數
                    if (workerCount >= this.maximumPoolSize) {
                        // 超過了最大執行緒數,建立非核心worker執行緒失敗
                        return false;
                    }
                }

                // cas更新workerCount的值
                boolean casSuccess = compareAndIncrementWorkerCount(currentCtl);
                if (casSuccess) {
                    // cas成功,跳出外層迴圈
                    break retry;
                }

                // 重新檢查一下當前執行緒池的狀態與之前是否一致
                currentCtl = ctl.get();  // Re-read ctl
                if (runStateOf(currentCtl) != runState) {
                    // 從外層迴圈開始continue(因為說明在這期間 執行緒池的工作狀態出現了變化,需要重新判斷)
                    continue retry;
                }

                // compareAndIncrementWorkerCount方法cas爭搶失敗,重新執行內層迴圈
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;

        MyWorker newWorker = null;
        try {
            // 建立一個新的worker
            newWorker = new MyWorker(firstTask);
            final Thread myWorkerThread = newWorker.thread;
            if (myWorkerThread != null) {
                // MyWorker初始化時內部執行緒建立成功

                // 加鎖,防止並行更新
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();

                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int runState = runStateOf(ctl.get());

                    // 重新檢查執行緒池執行狀態,滿足以下兩個條件的任意一個才建立新Worker
                    // 1 runState < SHUTDOWN
                    // 說明執行緒池處於RUNNING狀態正常執行,可以建立新的工作執行緒
                    // 2 runState == SHUTDOWN && firstTask == null
                    // 說明執行緒池呼叫了shutdown,但工作佇列不為空,依然需要新的Worker。
                    // firstTask == null標識著其不是因為外部提交新任務而建立新Worker,而是在消費SHUTDOWN前已提交的任務
                    if (runState < SHUTDOWN ||
                            (runState == SHUTDOWN && firstTask == null)) {
                        if (myWorkerThread.isAlive()) {
                            // 預檢查執行緒的狀態,剛初始化的worker執行緒必須是未喚醒的狀態
                            throw new IllegalThreadStateException();
                        }

                        // 加入worker集合
                        this.workers.add(newWorker);

                        int workerSize = workers.size();
                        if (workerSize > largestPoolSize) {
                            // 如果當前worker個數超過了之前記錄的最大存活執行緒數,將其更新
                            largestPoolSize = workerSize;
                        }

                        // 建立成功
                        workerAdded = true;
                    }
                } finally {
                    // 無論是否發生異常,都先將主控鎖解鎖
                    mainLock.unlock();
                }

                if (workerAdded) {
                    // 加入成功,啟動worker執行緒
                    myWorkerThread.start();
                    // 標識為worker執行緒啟動成功,並作為返回值返回
                    workerStarted = true;
                }
            }
        }finally {
            if (!workerStarted) {
                addWorkerFailed(newWorker);
            }
        }

        return workerStarted;
    }
    public boolean remove(Runnable task) {
        boolean removed = workQueue.remove(task);
        // 當一個任務從工作佇列中被成功移除,可能此時工作佇列為空。嘗試判斷是否滿足執行緒池中止條件
        tryTerminate();
        return removed;
    }

如何保證中止過程中不丟失任務?

  1. 通過shutdown關閉執行緒池時,SHUTDOWN狀態的執行緒池會等待所有剩餘的任務執行完畢後再進入TIDYING狀態。
  2. 通過shutdownNow關閉執行緒池時,以返回值的形式將剩餘的任務吐出來還給使用者

中止前已提交的任務不會丟失;而中止後執行緒池也不會再接收新的任務(走拒絕策略)。這兩點共同保證了提交的任務不會丟失。

如何保證執行緒池最終關閉前,所有工作執行緒都已退出?

執行緒池在收到中止命令進入SHUTDOWN或者STOP狀態時,會一直等到工作佇列為空且所有工作執行緒都中止退出後才會推進到TIDYING階段。
上面描述的條件是一個複合的條件,其只有在「收到停止指令(進入SHUTDOWN或者STOP狀態)」、"工作佇列中任務被移除或消費(工作佇列為空)"或是「工作執行緒退出(所有工作執行緒都中止退出)」這三類事件發生時才有可能滿足。
而判斷是否滿足條件並推進到TIDYING狀態的關鍵就在tryTerminate方法中。tryTerminate顧名思義便是用於嘗試終止執行緒池的,當上述任意事件觸發時便判斷是否滿足終止條件,如果滿足則將執行緒池推進到TIDYING階段。
因此在ThreadPoolExecutor中tryTerminate一共在6個地方被呼叫,分別是shutdown、shutdownNow、remove、purge、addWorkerFailed和processWorkerExit方法。

  • shutdown、shutdownNow方法觸發收到停止指令的事件
  • remove、purge方法觸發工作佇列中任務被移除的事件
  • addWorkerFailed、processWorkerExit方法觸發工作執行緒退出的事件
tryTerminate原始碼分析
   /**
     * 嘗試判斷是否滿足執行緒池中止條件,如果滿足條件,將其推進到最後的TERMINATED狀態
     * 注意:必須在任何可能觸發執行緒池中止的場景下呼叫(例如工作執行緒退出,或者SHUTDOWN狀態下佇列工作佇列為空等)
     * */
    final void tryTerminate() {
        for (;;) {
            int currentCtl = this.ctl.get();
            if (isRunning(currentCtl)
                    || runStateAtLeast(currentCtl, TIDYING)
                    || (runStateOf(currentCtl) == SHUTDOWN && !workQueue.isEmpty())) {
                // 1 isRunning(currentCtl)為true,說明執行緒池還在執行中,不滿足中止條件
                // 2 當前執行緒池狀態已經大於等於TIDYING了,說明之前別的執行緒可能已經執行過tryTerminate,且通過了這個if校驗,不用重複執行了
                // 3 當前執行緒池是SHUTDOWN狀態,但工作佇列中還有任務沒處理完,也不滿足中止條件
                // 以上三個條件任意一個滿足即直接提前return返回
                return;
            }

            // 有兩種場景會走到這裡
            // 1 執行了shutdown方法(runState狀態為SHUTDOWN),且當前工作執行緒已經空了
            // 2 執行了shutdownNow方法(runState狀態為STOP)
            // 這個時候需要令所有的工作執行緒都主動的退出來回收資源
            if (workerCountOf(currentCtl) != 0) {
                // 如果當前工作執行緒個數不為0,說明還有別的工作執行緒在工作中。
                // 通過interruptIdleWorkers(true),打斷其中的一個idle執行緒,嘗試令其也執行runWorker中的processWorkerExit邏輯,並執行tryTerminate
                // 被中斷的那個工作執行緒也會執行同樣的邏輯(getTask方法返回->processWorkerExit->tryTerminate)
                // 這樣可以一個接著一個的不斷打斷每一個工作執行緒,令其逐步的退出(比起一次性的通知所有的idle工作執行緒,這樣相對平滑很多)
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            // 執行緒池狀態runState為SHUTDOWN或者STOP,且存活的工作執行緒個數已經為0了
            // 雖然前面的interruptIdleWorkers是一個一箇中斷idle執行緒的,但實際上有的工作執行緒是因為別的原因退出的(恰好workerCountOf為0了)
            // 所以這裡是可能存在並行的,因此通過mainLock加鎖防止並行,避免重複的terminated方法呼叫和termination.signalAll方法呼叫
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // cas的設定ctl的值為TIDYING+工作執行緒個數0(防止與別的地方ctl並行更新)
                if (ctl.compareAndSet(currentCtl, ctlOf(TIDYING, 0))) {
                    try {
                        // cas成功,呼叫terminated勾點函數
                        terminated();
                    } finally {
                        // 無論terminated勾點函數是否出現異常
                        // cas的設定ctl的值為TERMINATED最終態+工作執行緒個數0(防止與別的地方ctl並行更新)
                        ctl.set(ctlOf(TERMINATED, 0));
                        // 通知使用awaitTermination方法等待執行緒池關閉的其它執行緒(通過termination.await等待)
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }

            // 如果上述對ctl變數的cas操作失敗了,則進行重試,再來一次迴圈
            // else retry on failed CAS
        }
    }
如何保證工作執行緒一定能成功退出?

從上面tryTerminate方法的實現中可以看到,執行緒池必須等到所有工作執行緒都全部退出(workerCount為0),工作執行緒佔用的全部資源都回收後才會推進到終止態。
那麼之前啟動的工作執行緒一定能通過processWorkerExit退出並銷燬嗎?答案是不一定,這主要取決於使用者是否正確的編寫了令工作執行緒安全退出的任務邏輯。
因為只有能退出任務執行邏輯(runWorker方法中的task.run())的工作執行緒才有機會執行processWorkerExit,無法從任務中跳出(正常退出or拋異常)的工作執行緒將永遠無法退出,導致執行緒池也永遠無法推進到終態。

下面分情況討論:

  • 任務中的邏輯是一定會執行完正常結束的(沒有無限迴圈也沒有令執行緒陷入阻塞態的操作)。那麼這是沒問題的
   ()->{
        // 會正常結束的
        System.out.println("hello world!");
   };
  • 任務中存在需要無限迴圈的邏輯。那麼最好在迴圈條件內監聽一個volatile的變數,當需要執行緒池停止時,修改這個變數,從而令任務從無限迴圈中正常退出。
    ()->{
        // 無限迴圈
        while(true){
            System.out.println("hello world!");
        }
    };
    ()->{
        // 無限迴圈時監聽一個變數
        while(!isStop) {
            System.out.println("hello world!");
        }
    };
  • 任務中存在Condition.await等會阻塞當前執行緒,令其無法自然退出的邏輯。
    tryTerminate中停止工作執行緒時會呼叫Worker類的interruptIfStarted方法發出中斷指令(Thread.interrupt方法),如果被阻塞的方法是響應中斷的,那麼業務程式碼中不能無腦吞掉InterruptedException,而要能感知到中斷異常,在確實要關閉執行緒池時令任務退出(向上拋異常或正常退出)。
    而如果是不響應中斷的阻塞方法(如ReentrantLock.lock),則需要使用者自己保證這些方法最終能夠被喚醒,否則工作執行緒將無法正常退出而阻止執行緒池進入終止狀態。
    ()->{
            try {
                new ReentrantLock().newCondition().await();
            } catch (InterruptedException e) {
                // doSomething處理一些邏輯後。。。
                // 向上丟擲異常
                throw new XXXException(e);
            }
        }
    ()->{
        try {
            new ReentrantLock().newCondition().await();
        } catch (InterruptedException e) {

        }
        // doSomething處理一些邏輯後。。。正常退出
    }
為什麼不線上程池終止時使用Thread.stop方法強制令工作執行緒停止呢?

雖然Thread.stop能夠保證執行緒一定會被停止,但由於停止的過程中存在很嚴重的並行安全問題而被廢棄而不推薦使用了。
具體原因可以參考官方檔案(Why is Thread.stop deprecated?):https://docs.oracle.com/javase/8/docs/technotes/guides/concurrency/threadPrimitiveDeprecation.html

總結

  • 本篇部落格從原始碼的角度詳細分析了jdk執行緒池ThreadPoolExecutor關於優雅停止實現的原理。其中重點介紹了ThreadPoolExecutor是如何做到中止後不能再受理新的任務、中止時不丟失已提交任務以及關閉時不會發生執行緒資源的洩露等核心功能。
  • 結合之前釋出的第一篇關於ThreadPoolExecutor正常執行時接受並執行所提交任務的部落格,雖然沒有100%的覆蓋ThreadPoolExecutor的全部功能,但依然完整的講解了ThreadPoolExecutor最核心的功能。希望這兩篇部落格能幫助到對jdk執行緒池實現原理感興趣的讀者。
  • 本篇部落格的完整程式碼在我的github上:https://github.com/1399852153/Reinventing-the-wheel-for-learning(ThreadPool模組 MyThreadPoolExecutorV2) 內容如有錯誤,還請多多指教。