jdk執行緒池ThreadPoolExecutor工作原理解析(自己動手實現執行緒池)(一)

2022-11-11 06:00:16

jdk執行緒池ThreadPoolExecutor工作原理解析(自己動手實現執行緒池)(一)

執行緒池介紹

在日常開發中經常會遇到需要使用其它執行緒將大量任務非同步處理的場景(非同步化以及提升系統的吞吐量),而在使用執行緒的過程中卻存在著兩個痛點。

  1. 在java等很多主流語言中每個邏輯上的執行緒底層都對應著一個系統執行緒(不考慮虛擬執行緒的情況)。作業系統建立一個新執行緒是存在一定開銷的,
    在需要執行大量的非同步任務時,如果處理每個任務時都直接向系統申請建立一個執行緒來執行,並在任務執行完畢後再回收執行緒,則建立/銷燬大量執行緒的開銷將無法忍受。
  2. 每個系統執行緒都會佔用一定的記憶體空間,且系統在排程不同執行緒上下文切換時存在一定的cpu開銷。因此在一定的硬體條件下,作業系統能同時維護的系統執行緒個數相對而言是比較有限的。
    在使用執行緒的過程中如果沒有控制好流量,會很容易建立過多的執行緒而耗盡系統資源,令系統變得不可用。

而執行緒池正是為解決上述痛點而生的,其通過兩個手段來解決上述痛點。

池化執行緒資源

池化執行緒資源,顧名思義就是維護一個存活執行緒的集合(池子)。提交任務的使用者程式不直接控制執行緒的建立和銷燬,不用每次執行任務時都申請建立一個新執行緒,而是通過執行緒池間接的獲得執行緒去處理非同步任務。
執行緒池中的執行緒在執行完任務後通常也不會被系統回收掉,而是繼續待在池子中用於執行其它的任務(執行堆積的待執行任務或是等待新任務)。
執行緒池通過池化執行緒資源,避免了系統反覆建立/銷燬執行緒的開銷,大幅提高了處理大規模非同步任務時的效能。

對執行緒資源的申請進行收口,限制系統資源的使用

如果程式都統一使用執行緒池來處理非同步任務,則執行緒池內部便可以對系統資源的使用施加一定限制。
例如使用者可以指定一個執行緒池最大可維護的執行緒數量,以避免耗盡系統資源。
當用戶提交任務的速率過大,導致執行緒池中的執行緒數到達指定的最大值時依然無法滿足需求時,執行緒池可以通過丟棄部分任務或限制提交任務的流量的方式來處理這一問題。
執行緒池通過對執行緒資源的使用進行統一收口,使用者可以通過設定執行緒池的引數來控制系統資源的使用,從而避免系統資源耗盡。

jdk執行緒池ThreadPoolExecutor簡單介紹

前面介紹了執行緒池的概念,而要深入理解執行緒池的工作原理最好的辦法便是找到一個優秀的執行緒池實現來加以研究。
而自jdk1.5中引入的通用執行緒池框架ThreadPoolExecutor便是一個很好的學習物件。其內部實現不算複雜,卻在高效實現核心功能的同時還提供了較豐富的拓展能力。

下面從整體上介紹一下jdk通用執行緒池ThreadPoolExecutor的工作原理(基於jdk8)。

ThreadPoolExecutor執行時工作流程

首先ThreadPoolExecutor允許使用者從兩個不同維度來控制執行緒資源的使用,即最大核心執行緒數(corePoolSize)和最大執行緒數(maximumPoolSize)。
最大核心執行緒數:核心執行緒指的是通常常駐執行緒池的執行緒。常駐執行緒線上程池沒有任務空閒時也不會被銷燬,而是處於idle狀態,這樣在新任務到來時就能很快的進行響應。
最大執行緒數:和第一節中提到的一樣,即執行緒池中所能允許的活躍執行緒的最大數量。

在向ThreadPoolExecutor提交任務時(execute方法),會執行一系列的判斷來決定任務應該如何被執行(原始碼在下一節中具體分析)。

  1. 首先判斷當前活躍的執行緒數是否小於指定的最大核心執行緒數corePoolSize
    如果為真,則說明當前執行緒池還未完成預熱,核心執行緒數不飽和,建立一個新執行緒來執行該任務。
    如果為假,則說明當前執行緒池已完成預熱,進行下一步判斷。
  2. 嘗試將當前任務放入工作佇列workQueue(阻塞佇列BlockingQueue),工作佇列中的任務會被執行緒池中的活躍執行緒按入隊順序逐個消費。
    如果入隊成功,則說明當前工作佇列未滿,入隊的任務將會被執行緒池中的某個活躍執行緒所消費並執行。
    如果入隊失敗,則說明當前工作佇列已飽和,執行緒池消費任務的速度可能太慢了,可能需要建立更多新執行緒來加速消費,進行下一步判斷。
  3. 判斷當前活躍的執行緒數是否小於指定的最大執行緒數maximumPoolSize
    如果為真,則說明當前執行緒池所承載的執行緒數還未達到引數指定的上限,還有餘量來建立新的執行緒加速消費,建立一個新執行緒來執行該任務。
    如果為假,則說明當前執行緒池所承載的執行緒數達到了上限,但處理任務的速度依然不夠快,需要觸發拒絕策略

ThreadPoolExecutor優雅停止

執行緒池的優雅停止一般要能做到以下幾點:

  1. 執行緒池在中止後不能再受理新的任務
  2. 執行緒池中止的過程中,已經提交的現存任務不能丟失(等待剩餘任務執行完再關閉或者能夠把剩餘的任務吐出來還給使用者)
  3. 執行緒池最終關閉前,確保建立的所有工作執行緒都已退出,不會出現資源的洩露

執行緒池自啟動後便會有大量的工作執行緒在內部持續不斷並行的執行提交的各種任務,而要想做到優雅停止並不是一件容易的事情。
因此ThreadPoolExecutor中最複雜、細節最多的部分並不在於上文中的正常工作流程,而在於分散在各個地方但又緊密共同作業的,控制優雅停止的邏輯。

ThreadPoolExecutor的其它功能

除了正常的工作流程以及優雅停止的功能外,ThreadPoolExecutor還提供了一些比較好用的功能

  1. 提供了很多protected修飾的勾點函數,便於使用者繼承並實現自己的執行緒池時進行一定的拓展
  2. 在執行時統計了總共執行的任務數等關鍵指標,並提供了對應的api便於使用者在執行時觀察執行狀態
  3. 允許線上程池執行過程中動態修改關鍵的設定引數(比如corePoolSize等),並實時的生效。

jdk執行緒池ThreadPoolExecutor原始碼解析(自己動手實現執行緒池v1版本)

如費曼所說:What I can not create I do not understand(我不能理解我創造不了的東西)。
通過模仿jdk的ThreadPoolExecutor實現,從零開始實現一個執行緒池,可以迫使自己去仔細的捋清楚jdk執行緒池中設計的各種細節,加深理解而達到更好的學習效果。

前面提到ThreadPoolExecutor的核心邏輯主要分為兩部分,一是正常執行時處理提交的任務的邏輯,二是實現優雅停止的邏輯。
因此我們實現的執行緒池MyThreadPoolExecutor(以My開頭用於區分)也會分為兩個版本,v1版本只實現前一部分即正常執行時執行任務的邏輯,將有關執行緒池優雅停止的邏輯全部去除。
相比直接啃jdk最終實現的原始碼,v1版本的實現會更簡單更易理解,讓正常執行任務時的邏輯更加清晰而不會耦合太多關於優雅停止的邏輯。

執行緒池關鍵成員變數介紹

ThreadPoolExecutor中有許多的成員變數,大致可以分為三類。

可由使用者自定義的、用於控制執行緒池執行的設定引數

  1. volatile int corePoolSize(最大核心執行緒數量)
  2. volatile int maximumPoolSize(最大執行緒數量)
  3. volatile long keepAliveTime(idle執行緒保活時間)
  4. final BlockingQueue workQueue(工作佇列(阻塞佇列))
  5. volatile ThreadFactory threadFactory(工作執行緒工廠)
  6. volatile RejectedExecutionHandler handler(拒絕例外處理器)
  7. volatile boolean allowCoreThreadTimeOut(是否允許核心執行緒在idle超時後退出)

其中前6個設定引數都可以在ThreadPoolExecutor的建構函式中指定,而allowCoreThreadTimeOut則可以通過暴露的public方法allowCoreThreadTimeOut來動態的設定。
其中大部分屬性都是volatile修飾的,目的是讓執行過程中可以用過提供的public方法動態修改這些值後,執行緒池中的工作執行緒或提交任務的使用者執行緒能及時的感知到變化(執行緒間的可見性),並進行響應(比如令核心執行緒自動的idle退出)
這些設定屬性具體如何控制執行緒池行為的原理都會在下面的原始碼解析中展開介紹。理解這些引數的工作原理後才能在實際的業務中使用執行緒池時為其設定合適的值。

僅供執行緒池內部工作時使用的屬性

  1. ReentrantLock mainLock(用於控制各種臨界區邏輯的並行)
  2. HashSet workers(當前活躍工作執行緒Worker的集合,工作執行緒的工作原理會在下文介紹)
  3. AtomicInteger ctl(執行緒池控制狀態,control的簡寫)

這裡重點介紹一下ctl屬性。ctl雖然是一個32位元的整型欄位(AtomicInteger),但實際上卻用於標識兩個業務屬性,即當前執行緒池的執行狀態和worker執行緒的總數量。
線上程池初始化時狀態位RUNNING,worker執行緒數量位0(private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));)。
ctl的32位元中的高3位用於標識執行緒池當前的狀態,剩餘的29位用於標識執行緒池中worker執行緒的數量(因此理論上ThreadPoolExecutor最大可容納的執行緒數並不是231-1(32位元中符號要佔一位),而是229-1)
由於聚合之後單獨的讀寫某一個屬性不是很方便,所以ThreadPoolExecutor中提供了很多基於位運算的輔助函數來簡化這些邏輯。

ctl這樣聚合的設計比起拆分成兩個獨立的欄位有什麼好處?
在ThreadPoolExecutor中關於優雅停止的邏輯中有很多地方是需要同時判斷當前工作執行緒數量與執行緒池狀態後,再對執行緒池狀態工作執行緒數量進行更新的(具體邏輯在下一篇v2版本的部落格中展開)。
且為了執行效率,不使用互斥鎖而是通過cas重試的方法來解決並行更新的問題。而對一個AtomicInteger屬性做cas重試的更新,要比同時控制兩個屬性進行cas的更新要簡單很多,執行效率也高很多。

ThreadPoolExecutor共有五種狀態,但有四種都和優雅停止有關(除了RUNNING)。
但由於v1版本的MyThreadPoolExecutorV1不支援優雅停止,所以不在本篇部落格中講解這些狀態具體的含義以及其是如何變化的(下一篇v2版本的部落格中展開)

記錄執行緒池執行過程中的一些關鍵指標

  1. completedTaskCount(執行緒池自啟動後已完成的總任務數)
  2. largestPoolSize(執行緒池自啟動後工作執行緒個數的最大值)
    在執行過程中,ThreadPoolExecutor會在對應的地方進行埋點,統計一些指標並提供相應的api給使用者實時的查詢,以提高執行緒池工作時的可觀測性。
public class MyThreadPoolExecutorV1 implements MyThreadPoolExecutor{
    
   /**
    * 指定的最大核心執行緒數量
    * */
   private volatile int corePoolSize;

   /**
    * 指定的最大執行緒數量
    * */
   private volatile int maximumPoolSize;

   /**
    * 執行緒保活時間(單位:納秒 nanos)
    * */
   private volatile long keepAliveTime;

   /**
    * 存放任務的工作佇列(阻塞佇列)
    * */
   private final BlockingQueue<Runnable> workQueue;

   /**
    * 執行緒工廠
    * */
   private volatile ThreadFactory threadFactory;

   /**
    * 拒絕策略
    * */
   private volatile MyRejectedExecutionHandler handler;

   /**
    * 是否允許核心執行緒在idle一定時間後被銷燬(和非核心執行緒一樣)
    * */
   private volatile boolean allowCoreThreadTimeOut;

   /**
    * 主控鎖
    * */
   private final ReentrantLock mainLock = new ReentrantLock();

   /**
    * 當前執行緒池已完成的任務數量
    * */
   private long completedTaskCount;

   /**
    * 維護當前存活的worker執行緒集合
    * */
   private final HashSet<MyWorker> workers = new HashSet<>();

   /**
    * 當前執行緒池中存在的worker執行緒數量 + 狀態的一個聚合(通過一個原子int進行cas,來避免對兩個業務屬性欄位加鎖來保證一致性)
    * v1版本只關心前者,即worker執行緒數量
    */
   private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
   private static final int COUNT_BITS = Integer.SIZE - 3;

   /**
    * 32位元的有符號整數,有3位是用來存放執行緒池狀態的,所以用來維護當前工作執行緒個數的部分就只能用29位了
    * 被佔去的3位中,有1位原來的符號位,2位是原來的數值位。
    * */
   private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

   /**
    * 執行緒池狀態poolStatus常數(狀態值只會由小到大,單調遞增)
    * 執行緒池狀態遷移圖:
    *         ↗ SHUTDOWN ↘
    * RUNNING       ↓       TIDYING → TERMINATED
    *         ↘   STOP   ↗
    * 1 RUNNING狀態,代表著執行緒池處於正常執行的狀態。能正常的接收並處理提交的任務
    * 執行緒池物件初始化時,狀態為RUNNING
    * 對應邏輯:private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    *
    * 2 SHUTDOWN狀態,代表執行緒池處於停止對外服務的狀態。不再接收新提交的任務,但依然會將workQueue工作佇列中積壓的任務處理完
    * 呼叫了shutdown方法時,狀態由RUNNING -> SHUTDOWN
    * 對應邏輯:shutdown方法中的advanceRunState(SHUTDOWN);
    *
    * 3 STOP狀態,代表執行緒池處於停止狀態。不再接受新提交的任務,同時也不再處理workQueue工作佇列中積壓的任務,當前還在處理任務的工作執行緒將收到interrupt中斷通知
    * 之前未呼叫shutdown方法,直接呼叫了shutdownNow方法,狀態由RUNNING -> STOP
    * 之前先呼叫了shutdown方法,後呼叫了shutdownNow方法,狀態由SHUTDOWN -> STOP
    * 對應邏輯:shutdownNow方法中的advanceRunState(STOP);
    *
    * 4 TIDYING狀態,代表著執行緒池即將完全終止,正在做最後的收尾工作
    * 當前執行緒池狀態為SHUTDOWN,任務被消費完工作佇列workQueue為空,且工作執行緒全部退出完成工作執行緒集合workers為空時,tryTerminate方法中將狀態由SHUTDOWN->TIDYING
    * 當前執行緒池狀態為STOP,工作執行緒全部退出完成工作執行緒集合workers為空時,tryTerminate方法中將狀態由STOP->TIDYING
    * 對應邏輯:tryTerminate方法中的ctl.compareAndSet(c, ctlOf(TIDYING, 0)
    *
    * 5 TERMINATED狀態,代表著執行緒池完全的關閉。之前執行緒池已經處於TIDYING狀態,且呼叫的勾點函數terminated已返回
    * 當前執行緒池狀態為TIDYING,呼叫的勾點函數terminated已返回
    * 對應邏輯:tryTerminate方法中的ctl.set(ctlOf(TERMINATED, 0));
    * */
   private static final int RUNNING = -1 << COUNT_BITS;
   private static final int SHUTDOWN = 0 << COUNT_BITS;
   private static final int STOP = 1 << COUNT_BITS;
   private static final int TIDYING = 2 << COUNT_BITS;
   private static final int TERMINATED = 3 << COUNT_BITS;

   // Packing and unpacking ctl
   private static int workerCountOf(int c)  { return c & CAPACITY; }
   private static int ctlOf(int rs, int wc) { return rs | wc; }

   /**
    * 跟蹤執行緒池曾經有過的最大執行緒數量(只能在mainLock的並行保護下更新)
    */
   private int largestPoolSize;

   private boolean compareAndIncrementWorkerCount(int expect) {
      return this.ctl.compareAndSet(expect, expect + 1);
   }
   private boolean compareAndDecrementWorkerCount(int expect) {
      return ctl.compareAndSet(expect, expect - 1);
   }

   private void decrementWorkerCount() {
      do {
         // cas更新,workerCount自減1
      } while (!compareAndDecrementWorkerCount(ctl.get()));
   }

   public MyThreadPoolExecutorV1(int corePoolSize,
                                 int maximumPoolSize,
                                 long keepAliveTime,
                                 TimeUnit unit,
                                 BlockingQueue<Runnable> workQueue,
                                 ThreadFactory threadFactory,
                                 MyRejectedExecutionHandler handler) {
      // 基本的引數校驗
      if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) {
         throw new IllegalArgumentException();
      }

      if (unit == null || workQueue == null || threadFactory == null || handler == null) {
         throw new NullPointerException();
      }

      // 設定成員變數
      this.corePoolSize = corePoolSize;
      this.maximumPoolSize = maximumPoolSize;
      this.workQueue = workQueue;
      this.keepAliveTime = unit.toNanos(keepAliveTime);
      this.threadFactory = threadFactory;
      this.handler = handler;
   }

   public ThreadFactory getThreadFactory() {
      return threadFactory;
   }
}

Worker工作執行緒

ThreadPoolExecutor中的工作執行緒並不是裸的Thread,而是被封裝在了一個Worker的內部類中。
Worker實現了Runnable所以可以作為一個普通的執行緒來啟動,在run方法中只是簡單的呼叫了一下runWorker(runWorker後面再展開)。
Worker類有三個成員屬性:

  1. Thread thread(被封裝的工作執行緒物件)
  2. Runnable firstTask(提交任務時,建立新Worker物件時指定的第一次要執行的任務(後續執行緒就會去拉取工作佇列裡的任務執行了))
  3. volatile long completedTasks(統計用,計算當前工作執行緒總共完成了多少個任務)

Worker內封裝的實際的工作執行緒物件thread,其在建構函式中由執行緒池的執行緒工廠threadFactory生成,傳入this,所以thread在start後,便會呼叫run方法進而執行runWorker。
執行緒工廠可以由使用者在建立執行緒池時通過引數指定,因此使用者在自由控制所生成的工作執行緒的同時,也需要保證newThread能正確的返回一個可用的執行緒物件。

除此之外,Worker物件還繼承了AbstractQueuedSynchronizer(AQS)類,簡單的實現了一個不可重入的互斥鎖。
對AQS互斥模式不太瞭解的讀者可以參考一下我之前關於AQS互斥模式的部落格:AQS互斥模式與ReentrantLock可重入鎖原理解析
AQS中維護了一個volatile修飾的int型別的成員變數state,其具體的含義可以由使用者自己定義。
在Worker中,state的值有三種狀態:

  1. state=-1,標識工作執行緒還未啟動(不會被interruptIfStarted打斷)
  2. state=0,標識工作執行緒已經啟動,但沒有開始處理任務(可能是在等待任務,idle狀態)
  3. state=1,標識worker執行緒正在執行任務(runWorker方法中,成功獲得任務後,通過lock方法將state設定為1)

具體這三種情況分別在什麼時候出現會在下面解析提交任務原始碼的那部分裡詳細介紹。

    /**
     * jdk的實現中令Worker繼承AbstractQueuedSynchronizer並實現了一個不可重入的鎖
     * AQS中的state屬性含義
     * -1:標識工作執行緒還未啟動
     *  0:標識工作執行緒已經啟動,但沒有開始處理任務(可能是在等待任務,idle狀態)
     *  1:標識worker執行緒正在執行任務(runWorker中,成功獲得任務後,通過lock方法將state設定為1)
     * */
    private final class MyWorker extends AbstractQueuedSynchronizer implements Runnable{

        final Thread thread;
        Runnable firstTask;
        volatile long completedTasks;

        public MyWorker(Runnable firstTask) {
            this.firstTask = firstTask;

            // newThread可能是null
            this.thread = getThreadFactory().newThread(this);
        }

        @Override
        public void run() {
            runWorker(this);
        }

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock(){
            acquire(1);
        }

        public boolean tryLock(){
            return tryAcquire(1);
        }

        public void unlock(){
            release(1);
        }

        public boolean isLocked(){
            return isHeldExclusively();
        }

        void interruptIfStarted() {
            Thread t;
            // 三個條件同時滿足,才去中斷Worker對應的thread
            // getState() >= 0,用於過濾還未執行runWorker的,剛入隊初始化的Worker
            // thread != null,用於過濾掉構造方法中ThreadFactory.newThread返回null的Worker
            // !t.isInterrupted(),用於過濾掉那些已經被其它方式中斷的Worker執行緒(比如使用者自己去觸發中斷,提前終止執行緒池中的任務)
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

execute執行提交的任務

下面介紹本篇部落格的重點,即執行緒池是如何執行使用者所提交的任務的。
使用者提交任務的入口是public的execute方法,Runnable型別的引數command就是提交的要執行的任務。

MyThreadPoolExecutorV1的execute方法(相比jdk的實現v1版本去掉了關於優雅停止的邏輯)
   /**
     * 提交任務,並執行
     * */
    public void execute(Runnable command) {
        if (command == null){
            throw new NullPointerException("command引數不能為空");
        }

        int currentCtl = this.ctl.get();
        if (workerCountOf(currentCtl) < this.corePoolSize) {
            // 如果當前存在的worker執行緒數量低於指定的核心執行緒數量,則建立新的核心執行緒
            boolean addCoreWorkerSuccess = addWorker(command,true);
            if(addCoreWorkerSuccess){
                // addWorker新增成功,直接返回即可
                return;
            }
        }

        // 走到這裡有兩種情況
        // 1 因為核心執行緒超過限制(workerCountOf(currentCtl) < corePoolSize == false),需要嘗試嘗試將任務放入阻塞佇列
        // 2 addWorker返回false,建立核心工作執行緒失敗
        if(this.workQueue.offer(command)){
            // workQueue.offer入隊成功

            if(workerCountOf(currentCtl) == 0){
                // 在corePoolSize為0的情況下,當前不存在存活的核心執行緒
                // 一個任務在入隊之後,如果當前執行緒池中一個執行緒都沒有,則需要兜底的建立一個非核心執行緒來處理入隊的任務
                // 因此firstTask為null,目的是先讓任務先入隊後建立執行緒去拉取任務並執行
                addWorker(null,false);
            }else{
                // 加入佇列成功,且當前存在worker執行緒,成功返回
                return;
            }
        }else{
            // 阻塞佇列已滿,嘗試建立一個新的非核心執行緒處理
            boolean addNonCoreWorkerSuccess = addWorker(command,false);
            if(!addNonCoreWorkerSuccess){
                // 建立非核心執行緒失敗,執行拒絕策略(失敗的原因和前面建立核心執行緒addWorker的原因類似)
                reject(command);
            }else{
                // 建立非核心執行緒成功,成功返回
                return;
            }
        }
    }

   /**
    * 根據指定的拒絕處理器,執行拒絕策略
    * */
    private void reject(Runnable command) {
        this.handler.rejectedExecution(command, this);
    }    

可以看到,execute方法原始碼中對於任務處理的邏輯很清晰,也能與ThreadPoolExecutor執行時工作流程中所介紹的流程所匹配。

addWorker方法(建立新的工作執行緒)

在execute方法中當需要建立核心執行緒或普通執行緒時,便需要通過addWorker方法嘗試建立一個新的工作執行緒。

   /**
     * 向執行緒池中加入worker
     * */
    private boolean addWorker(Runnable firstTask, boolean core) {
        // retry標識外層迴圈
        retry:
        for (;;) {
            int currentCtl = ctl.get();

            // 用於cas更新workerCount的內層迴圈(注意這裡面與jdk的寫法不同,改寫成了邏輯一致但更可讀的形式)
            for (;;) {
                // 判斷當前worker數量是否超過了限制
                int workerCount = workerCountOf(currentCtl);
                if (workerCount >= CAPACITY) {
                    // 當前worker數量超過了設計上允許的最大限制
                    return false;
                }
                if (core) {
                    // 建立的是核心執行緒,判斷當前執行緒數是否已經超過了指定的核心執行緒數
                    if (workerCount >= this.corePoolSize) {
                        // 超過了核心執行緒數,建立核心worker執行緒失敗
                        return false;
                    }
                } else {
                    // 建立的是非核心執行緒,判斷當前執行緒數是否已經超過了指定的最大執行緒數
                    if (workerCount >= this.maximumPoolSize) {
                        // 超過了最大執行緒數,建立非核心worker執行緒失敗
                        return false;
                    }
                }

                // cas更新workerCount的值
                boolean casSuccess = compareAndIncrementWorkerCount(currentCtl);
                if (casSuccess) {
                    // cas成功,跳出外層迴圈
                    break retry;
                }

                // compareAndIncrementWorkerCount方法cas爭搶失敗,重新執行內層迴圈
            }
        }

        boolean workerStarted = false;

        MyWorker newWorker = null;
        try {
            // 建立一個新的worker
            newWorker = new MyWorker(firstTask);
            final Thread myWorkerThread = newWorker.thread;
            if (myWorkerThread != null) {
                // MyWorker初始化時內部執行緒建立成功

                // 加鎖,防止並行更新
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();

                try {
                    if (myWorkerThread.isAlive()) {
                        // 預檢查執行緒的狀態,剛初始化的worker執行緒必須是未喚醒的狀態
                        throw new IllegalThreadStateException();
                    }

                    // 加入worker集合
                    this.workers.add(newWorker);

                    int workerSize = workers.size();
                    if (workerSize > largestPoolSize) {
                        // 如果當前worker個數超過了之前記錄的最大存活執行緒數,將其更新
                        largestPoolSize = workerSize;
                    }

                    // 建立成功
                } finally {
                    // 無論是否發生異常,都先將主控鎖解鎖
                    mainLock.unlock();
                }

                // 加入成功,啟動worker執行緒
                myWorkerThread.start();
                // 標識為worker執行緒啟動成功,並作為返回值返回
                workerStarted = true;
            }
        }finally {
            if (!workerStarted) {
                addWorkerFailed(newWorker);
            }
        }

        return workerStarted;
    }

addWorker可以分為兩部分:判斷當前是否滿足建立新工作執行緒的條件、建立並啟動新的Worker工作執行緒。

判斷當前是否滿足建立新工作執行緒的條件

入口處開始的retry標識的for迴圈部分,便是用於判斷是否滿足建立新工作執行緒的條件。

  • 首先判斷當前工作執行緒數量是否超過了理論的最大值CAPACITY(即2^29-1),超過了則不能建立,返回false,不建立新工作執行緒
  • 根據boolean型別引數core判斷是否建立核心工作執行緒,core=true則判斷是否超過了corePoolSize的限制,core=false則判斷是否超過了maximumPoolSize的限制。不滿足則返回false,不建立新工作執行緒
  • 滿足上述限制條件後,則說明可以建立新執行緒了,compareAndIncrementWorkerCount方法進行cas的增加當前工作執行緒數。
    如果cas失敗,則說明存在並行的更新了,則再一次的迴圈重試,並再次的進行上述檢查。

需要注意的是:這裡面有兩個for迴圈的原因在於v1版本省略了優雅停止的邏輯(所以實際上v1版本能去掉內層迴圈的)。如果執行緒池處於停止狀態則不能再建立新工作執行緒了,因此也需要判斷執行緒池當前的狀態,
不滿足條件則也需要返回false,不建立工作執行緒。
而且compareAndIncrementWorkerCount中cas更新ctl時,如果並行的執行緒池被停止而導致執行緒池狀態發生了變化,也會導致cas失敗重新檢查。
這也是jdk的實現中為什麼把執行緒池狀態和工作執行緒數量繫結在一起的原因之一,這樣在cas更新時可以原子性的同時檢查兩個欄位的並行爭搶。(更具體的細節會在下一篇部落格的v2版本中介紹)

建立並啟動新的Worker工作執行緒

在通過retry那部分的層層條件檢查後,緊接著便是實際建立新工作執行緒的邏輯。

  • 首先通過Worker的構造方法建立一個新的Worker物件,並將使用者提交的任務作為firstTask引數傳入。
  • 判斷Worker在構造時執行緒工廠是否正確的生成了一個Thread(判空),如果thread == null的話直接返回false,標識建立新工作執行緒失敗。
  • 在mainLock的保護下,將新建立的worker執行緒加入workers集合中
  • 啟動Worker中的執行緒(myWorkerThread.start()),啟動後會執行Worker類中的run方法,新的工作執行緒會執行runWorker方法(下文會展開分析runWorker)
  • 如果Worker中的執行緒不是alive狀態等原因導致工作執行緒啟動失敗,則在finally中通過addWorkerFailed進行一系列的回滾操作

雖然在前面執行緒池工作流程的分析中提到了核心執行緒與非核心執行緒的概念,但Worker類中實際上並沒有核心/非核心的標識。
經過了工作執行緒啟動前的條件判斷後,新建立的工作執行緒實際上並沒有真正的核心與非核心的差別。

addWorkerFailed(addWorker的逆向回滾操作)

addWorker中工作執行緒可能會啟動失敗,所以要對addWorker中對workers集合以及workerCount等資料的操作進行回滾。

   /**
     * 當建立worker出現異常失敗時,對之前的操作進行回滾
     * 1 如果新建立的worker加入了workers集合,將其移除
     * 2 減少記錄存活的worker個數(cas更新)
     * 3 檢查執行緒池是否滿足中止的狀態,防止這個存活的worker執行緒阻止執行緒池的中止(v1版本不考慮,省略了tryTerminate)
     */
    private void addWorkerFailed(MyWorker myWorker) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (myWorker != null) {
                // 如果新建立的worker加入了workers集合,將其移除
                workers.remove(myWorker);
            }
            // 減少存活的worker個數
            decrementWorkerCount();

            // 嘗試著將當前worker執行緒終止(addWorkerFailed由工作執行緒自己呼叫)
            // tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

runWorker(工作執行緒核心執行邏輯)

前面介紹了使用者如何向執行緒池提交任務,以及如何建立新工作執行緒Worker,下面介紹工作執行緒線上程池中是如何執行的。

  • runWorker方法內部本質上是一個無限迴圈,在進入主迴圈之前通過unlock方法,將內部AQS父類別中的state標識為0,允許被外部中斷(可以被interruptIfStarted選中而打斷)
  • 之後便是主迴圈,如果firstTask不為空(說明第一次啟動),則直接呼叫task.run方法。否則通過getTask方法嘗試從工作佇列中撈取一個任務來執行
  • 在實際的任務執行前和執行後都呼叫對應的勾點方法(beforeExecute、afterExecute)
  • 在任務執行前通過lock方法將AQS的state方法設定為1代表當前Worker正在執行任務,並在執行完一個任務後在finally中進行unlock解鎖,令當前工作執行緒進入idle狀態。
    同時清空firstTask的值(清空後下一次迴圈就會通過getTask獲取任務了)並令Worker中的completedTasks統計指標也自增1
  • 如果任務執行過程中出現了異常,則catch住並最終向上丟擲跳出主迴圈,finally中執行processWorkerExit(認為任務一旦執行出現了異常,則很可能工作執行緒內部的一些狀態已經損壞,需要重新建立一個新的工作執行緒來代替出異常的老工作執行緒)
  • 有兩種情況會導致執行processWorkerExit,一種是上面說的任務執行時出現了異常,此時completedAbruptly=true;還有一種可能時getTask因為一些原因返回了null,此時completedAbruptly=false。
    completedAbruptly會作為processWorkerExit的引數傳遞。
   /**
     * worker工作執行緒主迴圈執行邏輯
     * */
    private void runWorker(MyWorker myWorker) {
        // 時worker執行緒的run方法呼叫的,此時的current執行緒的是worker執行緒
        Thread workerThread = Thread.currentThread();

        Runnable task = myWorker.firstTask;
        // 已經暫存了firstTask,將其清空(有地方根據firstTask是否存在來判斷工作執行緒中負責的任務是否是新提交的)
        myWorker.firstTask = null;

        // 將state由初始化時的-1設定為0
        // 標識著此時當前工作執行緒開始工作了,這樣可以被interruptIfStarted選中
        myWorker.unlock();

        // 預設執行緒是由於中斷退出的
        boolean completedAbruptly = true;
        try {
            // worker執行緒處理主迴圈,核心邏輯
            while (task != null || (task = getTask()) != null) {
                // 將state由0標識為1,代表著其由idle狀態變成了正在工作的狀態
                // 這樣interruptIdleWorkers中的tryLock會失敗,這樣工作狀態的執行緒就不會被該方法中斷任務的正常執行
                myWorker.lock();

                // v1版本此處省略優雅停止相關的核心邏輯

                try {
                    // 任務執行前的勾點函數
                    beforeExecute(workerThread, task);
                    Throwable thrown = null;
                    try {
                        // 拿到的任務開始執行
                        task.run();
                    } catch (RuntimeException | Error x) {
                        // 使用thrown收集丟擲的異常,傳遞給afterExecute
                        thrown = x;
                        // 同時丟擲錯誤,從而中止主迴圈
                        throw x;
                    } catch (Throwable x) {
                        // 使用thrown收集丟擲的異常,傳遞給afterExecute
                        thrown = x;
                        // 同時丟擲錯誤,從而中止主迴圈
                        throw new Error(x);
                    } finally {
                        // 任務執行後的勾點函數,如果任務執行時丟擲了錯誤/異常,thrown不為null
                        afterExecute(task, thrown);
                    }
                } finally {
                    // 將task設定為null,令下一次while迴圈通過getTask獲得新任務
                    task = null;
                    // 無論執行時是否存在異常,已完成的任務數加1
                    myWorker.completedTasks++;
                    // 無論如何將myWorker解鎖,標識為idle狀態
                    myWorker.unlock();
                }

            }
            // getTask返回了null,說明沒有可執行的任務或者因為idle超時、執行緒數超過設定等原因需要回收當前執行緒。
            // 執行緒正常的退出,completedAbruptly為false
            completedAbruptly = false;
        }finally {
            // getTask返回null,執行緒正常的退出,completedAbruptly值為false
            // task.run()執行時丟擲了異常/錯誤,直接跳出了主迴圈,此時completedAbruptly為初始化時的預設值true
            processWorkerExit(myWorker, completedAbruptly);

            // processWorkerExit執行完成後,worker執行緒對應的run方法(run->runWorker)也會執行完畢
            // 此時執行緒物件會進入終止態,等待作業系統回收
            // 而且processWorkerExit方法內將傳入的Worker從workers集合中移除,jvm中的物件也會因為不再被參照而被GC回收
            // 此時,當前工作執行緒所佔用的所有資源都已釋放完畢
        }
    }
getTask嘗試獲取任務執行

runWorker中是通過getTask獲取任務的,getTask中包含著工作執行緒是如何從工作佇列中獲取任務的關鍵邏輯。

  • 在獲取任務前,需要通過getTask檢查當前執行緒池的執行緒數量是否超過了引數設定(啟動後被動態調整了),因此需要先獲得當前執行緒池工作執行緒總數workCount。
    如果當前工作執行緒數量超過了指定的最大執行緒個數maximumPoolSize限制,則說明當前執行緒需要退出了
  • timed標識用於決定當前執行緒如何從工作佇列(阻塞佇列)中獲取新任務,如果timed為true則通過poll方法獲取同時指定相應的超時時間(設定引數keepAliveTime),如果timed為false則通過take方法無限期的等待。
    如果工作佇列並不為空,則poll和take方法都會立即返回一個任務物件。而當工作佇列為空時,工作執行緒則會阻塞在工作佇列上以讓出CPU(idle狀態)直到有新的任務到來而被喚醒(或者超時喚醒)。
    這也是儲存任務的workQueue不能是普通的佇列,而必須是阻塞佇列的原因。(對阻塞佇列工作原理不太清楚的讀者可以參考我以前的部落格:自己動手實現一個阻塞佇列
  • timed的值由兩方面共同決定。一是設定引數allowCoreThreadTimeOut是否為true,為true的話說明不管是核心執行緒還是非核心執行緒都需要在idle等待keepAliveTime後銷燬退出。所以allowCoreThreadTimeOut=true,則timed一定為true
    二是如果allowCoreThreadTimeOut為false,說明核心執行緒不需要退出,而非核心執行緒在idle等待keepAliveTime後需要銷燬退出。則判斷當前workCount是否大於設定的corePoolSize,是的話則timed為true否則為false。
    如果當前執行緒數超過了指定的最大核心執行緒數corePoolSize,則需要讓工作佇列為空時(說明執行緒池負載較低)部分idle執行緒退出,使得最終活躍的執行緒數減少到和corePoolSize一致。
    從這裡可以看到,核心與非核心執行緒的概念在ThreadPoolExecutor裡是很弱的,不關心工作執行緒最初是以什麼原因建立的都一視同仁,誰都可能被當作非核心執行緒而銷燬退出。
  • timedOut標識當前工作執行緒是否因為poll拉取任務時出現了超時。take永遠不會返回null,因此只有poll在超時時會返回null,當poll返回值為null時,表明是等待了keepAliveTime時間後超時了,所以timedOut標識為true。
    同時如果拉取任務時執行緒被中斷了,則捕獲InterruptedException異常,將timeOut標識為false(被中斷的就不認為是超時)。
  • 當(workCount > maximumPoolSize)或者 (timed && timedOut)兩者滿足一個時,就說明當前執行緒應該要退出了。
    此時將當前的workCount用cas的方式減去1,返回null代表獲取任務失敗即可;如果cas失敗,則在for迴圈中重試。
    但有一種情況是例外的(workCount <= 1 && !workQueue.isEmpty()),即當前工作執行緒數量恰好為1,且工作佇列不為空(那麼還需要當前執行緒繼續工作把工作佇列裡的任務都消費掉,無論如何不能退出)
   /**
     * 嘗試著從阻塞佇列裡獲得待執行的任務
     * @return 返回null代表工作佇列為空,沒有需要執行的任務; 或者當前worker執行緒滿足了需要退出的一些條件
     *         返回對應的任務
     * */
    private Runnable getTask() {
        boolean timedOut = false;

        for(;;) {
            int currentCtl = ctl.get();

            // 獲得當前工作執行緒個數
            int workCount = workerCountOf(currentCtl);

            // 有兩種情況需要指定超時時間的方式從阻塞佇列workQueue中獲取任務(即timed為true)
            // 1.執行緒池設定引數allowCoreThreadTimeOut為true,即允許核心執行緒在idle一定時間後被銷燬
            //   所以allowCoreThreadTimeOut為true時,需要令timed為true,這樣可以讓核心執行緒也在一定時間內獲取不到任務(idle狀態)而被銷燬
            // 2.執行緒池設定引數allowCoreThreadTimeOut為false,但當前執行緒池中的執行緒數量workCount大於了指定的核心執行緒數量corePoolSize
            //   說明當前有一些非核心的執行緒正在工作,而非核心的執行緒在idle狀態一段時間後需要被銷燬
            //   所以此時也令timed為true,讓這些執行緒在keepAliveTime時間內由於佇列為空拉取不到任務而返回null,將其銷燬
            boolean timed = allowCoreThreadTimeOut || workCount > corePoolSize;

            // 有共四種情況不需要往下執行,代表
            // 1 (workCount > maximumPoolSize && workCount > 1)
            // 當前工作執行緒個數大於了指定的maximumPoolSize(可能是由於啟動後通過setMaximumPoolSize調小了maximumPoolSize的值)
            // 已經不符合執行緒池的設定引數約束了,要將多餘的工作執行緒回收掉
            // 且當前workCount > 1說明存在不止一個工作執行緒,意味著即使將當前工作執行緒回收後也還有其它工作執行緒能繼續處理工作佇列裡的任務,直接返回null表示自己需要被回收

            // 2 (workCount > maximumPoolSize && workCount <= 1 && workQueue.isEmpty())
            // 當前工作執行緒個數大於了指定的maximumPoolSize(maximumPoolSize被設定為0了)
            // 已經不符合執行緒池的設定引數約束了,要將多餘的工作執行緒回收掉
            // 但此時workCount<=1,說明將自己這個工作執行緒回收掉後就沒有其它工作執行緒能處理工作佇列裡剩餘的任務了
            // 所以即使maximumPoolSize設定為0,也需要等待任務被處理完,工作佇列為空之後才能回收當前執行緒,否則還會繼續拉取剩餘任務

            // 3 (workCount <= maximumPoolSize && (timed && timedOut) && workCount > 1)
            // workCount <= maximumPoolSize符合要求
            // 但是timed && timedOut,說明timed判定命中,需要以poll的方式指定超時時間,並且最近一次拉取任務超時了timedOut=true
            // 進入新的一次迴圈後timed && timedOut成立,說明當前worker執行緒處於idle狀態等待任務超過了規定的keepAliveTime時間,需要回收當前執行緒
            // 且當前workCount > 1說明存在不止一個工作執行緒,意味著即使將當前工作執行緒回收後也還有其它工作執行緒能繼續處理工作佇列裡的任務,直接返回null表示自己需要被回收

            // 4 (workCount <= maximumPoolSize && (timed && timedOut) && workQueue.isEmpty())
            // workCount <= maximumPoolSize符合要求
            // 但是timed && timedOut,說明timed判定命中,需要以poll的方式指定超時時間,並且最近一次拉取任務超時了timedOut=true
            // 進入新的一次迴圈後timed && timedOut成立,說明當前worker執行緒處於idle狀態等待任務超過了規定的keepAliveTime時間,需要回收當前執行緒
            // 但此時workCount<=1,說明將自己這個工作執行緒回收掉後就沒有其它工作執行緒能處理工作佇列裡剩餘的任務了
            // 所以即使timed && timedOut超時邏輯匹配,也需要等待任務被處理完,工作佇列為空之後才能回收當前執行緒,否則還會繼續拉取剩餘任務
            if ((workCount > maximumPoolSize || (timed && timedOut))
                    && (workCount > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(currentCtl)) {
                    // 滿足上述條件,說明當前執行緒需要被銷燬了,返回null
                    return null;
                }

                // compareAndDecrementWorkerCount方法由於並行的原因cas執行失敗,continue迴圈重試
                continue;
            }

            try {
                // 根據上面的邏輯的timed標識,決定以什麼方式從阻塞佇列中獲取任務
                Runnable r = timed ?
                        // timed為true,通過poll方法指定獲取任務的超時時間(如果指定時間內沒有佇列依然為空,則返回)
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        // timed為false,通過take方法無限期的等待阻塞佇列中加入新的任務
                        workQueue.take();
                if (r != null) {
                    // 獲得了新的任務,getWork正常返回對應的任務物件
                    return r;
                }else{
                    // 否則說明timed=true,且poll拉取任務時超時了
                    timedOut = true;
                }
            } catch (InterruptedException retry) {
                // poll or take任務等待時worker執行緒被中斷了,捕獲中斷異常
                // timeout = false,標識拉取任務時沒有超時
                timedOut = false;
            }
        }
    }
processWorkerExit(處理工作執行緒退出)

在runWorker中,如果getTask方法沒有拿到任務返回了null或者任務在執行時丟擲了異常就會在最終的finally塊中呼叫processWorkerExit方法,令當前工作執行緒銷燬退出。

  • processWorkerExit方法內會將當前執行緒佔用的一些資源做清理,比如從workers中移除掉當前執行緒(利於Worker物件的GC),並令當前執行緒workerCount減一(completedAbruptly=true,說明是中斷導致的退出,getTask中沒來得及減workerCount,在這裡補正)
  • completedAbruptly=true,說明是runWorker中任務異常導致的執行緒退出,無條件的通過addWorker重新建立一個新的工作執行緒代替當前退出的工作執行緒。
  • completedAbruptly=false,在退出當前工作執行緒後,需要判斷一下退出後當前所存活的工作執行緒數量是否滿足要求。
    比如allowCoreThreadTimeOut=false時,當前工作執行緒個數是否不低於corePoolSize等,如果不滿足要求則通過addWorker重新建立一個新的執行緒。
工作執行緒退出時所佔用資源的回收
  • processWorkerExit方法執行完畢後,當前工作執行緒就完整的從當前執行緒池中退出了(workers中沒有了參照,workerCount減1了),GC便會將記憶體中的Worker物件所佔用的記憶體給回收掉。
  • 同時runWorker中最後執行完processWorkerExit後,工作執行緒的run方法也return了,標識著整個執行緒正常退出了,作業系統層面上也會將執行緒轉為終止態並最終回收。至此,執行緒佔用的所有資源就被徹底的回收乾淨了。
 /**
     * 處理worker執行緒退出
     * @param myWorker 需要退出的工作執行緒物件
     * @param completedAbruptly 是否是因為中斷異常的原因,而需要回收
     * */
    private void processWorkerExit(MyWorker myWorker, boolean completedAbruptly) {
        if (completedAbruptly) {
            // 如果completedAbruptly=true,說明是任務在run方法執行時出錯導致的執行緒退出
            // 而正常退出時completedAbruptly=false,在getTask中已經將workerCount的值減少了
            decrementWorkerCount();
        }

        ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 執行緒池全域性總完成任務數累加上要退出的工作執行緒已完成的任務數
            this.completedTaskCount += myWorker.completedTasks;
            // workers集合中將當前工作執行緒剔除
            workers.remove(myWorker);

            // completedTaskCount是long型別的,workers是HashSet,
            // 都是非執行緒安全的,所以在mainLock的保護進行修改
        } finally {
            mainLock.unlock();
        }

        int currentCtl = this.ctl.get();

        if (!completedAbruptly) {
            // completedAbruptly=false,說明不是因為中斷異常而退出的
            // min標識當前執行緒池允許的最小執行緒數量
            // 1 如果allowCoreThreadTimeOut為true,則核心執行緒也可以被銷燬,min=0
            // 2 如果allowCoreThreadTimeOut為false,則min應該為所允許的核心執行緒個數,min=corePoolSize
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty()) {
                // 如果min為0了,但工作佇列不為空,則修正min=1,因為至少需要一個工作執行緒來將工作佇列中的任務消費、處理掉
                min = 1;
            }
            if (workerCountOf(currentCtl) >= min) {
                // 如果當前工作執行緒數大於了min,當前執行緒數量是足夠的,直接返回(否則要執行下面的addWorker恢復)
                return;
            }
        }
        // 兩種場景會走到這裡進行addWorker操作
        // 1 completedAbruptly=true,說明執行緒是因為中斷異常而退出的,需要重新建立一個新的工作執行緒
        // 2 completedAbruptly=false,且上面的workerCount<min,則說明當前工作執行緒數不夠,需要建立一個
        // 為什麼引數core傳的是false呢?
        // 因為completedAbruptly=true而中斷退出的執行緒,無論當前工作執行緒數是否大於核心執行緒,都需要建立一個新的執行緒來代替原有的被退出的執行緒
        addWorker(null, false);
    }

動態修改設定引數

ThreadPoolExecutor除了支援啟動前通過建構函式設定設定引數外,也允許線上程池執行的過程中動態的更改設定。而要實現動態的修改設定,麻煩程度要比啟動前靜態的指定大得多。
舉個例子,線上程池的執行過程中如果當前corePoolSize=20,且已經建立了20個核心執行緒時(workerCount=20),現在將corePoolSize減少為10或者增大為30時應該如何實時的生效呢?
下面通過內嵌於程式碼中的註釋,詳細的說明了allowCoreThreadTimeOutcorePoolSizemaximumPoolSize這三個關鍵設定引數實現動態修改的原理。

   /**
     * 設定是否允許核心執行緒idle超時後退出
     * */
    public void allowCoreThreadTimeOut(boolean value) {
        if (value && keepAliveTime <= 0) {
            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
        }
        // 判斷一下新舊值是否相等,避免無意義的volatile變數更新,導致不必要的cpu cache同步
        if (value != allowCoreThreadTimeOut) {
            allowCoreThreadTimeOut = value;
            if (value) {
                // 引數值value為true,說明之前不允許核心執行緒由於idle超時而退出
                // 而此時更新為true說明現在允許了,則通過interruptIdleWorkers喚醒所有的idle執行緒
                // 令其走一遍runWorker中的邏輯,嘗試著讓idle超時的核心執行緒及時銷燬
                interruptIdleWorkers();
            }
        }
    }

    /**
     * 動態更新核心執行緒最大值corePoolSize
     * */
    public void setCorePoolSize(int corePoolSize) {
        if (corePoolSize < 0) {
            throw new IllegalArgumentException();
        }

        // 計算差異
        int delta = corePoolSize - this.corePoolSize;
        // 賦值
        this.corePoolSize = corePoolSize;
        if (workerCountOf(this.ctl.get()) > corePoolSize) {
            // 更新完畢後,發現當前工作執行緒數超過了指定的值
            // 喚醒所有idle執行緒,讓目前空閒的idle超時的執行緒在workerCount大於maximumPoolSize時及時銷燬
            interruptIdleWorkers();
        } else if (delta > 0) {
            // 差異大於0,代表著新值大於舊值

            // We don't really know how many new threads are "needed".
            // As a heuristic, prestart enough new workers (up to new
            // core size) to handle the current number of tasks in
            // queue, but stop if queue becomes empty while doing so.
            // 我們無法確切的知道有多少新的執行緒是所需要的。
            // 啟發式的預先啟動足夠的新工作執行緒用於處理工作佇列中的任務
            // 但當執行此操作時工作佇列為空了,則立即停止此操作(佇列為空了說明當前負載較低,再建立更多的工作執行緒是浪費資源)

            // 取差異和當前工作佇列中的最小值為k
            int k = Math.min(delta, workQueue.size());

            // 嘗試著一直增加新的工作執行緒,直到和k相同
            // 這樣設計的目的在於控制增加的核心執行緒數量,不要一下子建立過多核心執行緒
            // 舉個例子:原來的corePoolSize是10,且工作執行緒數也是10,現在新值設定為了30,新值比舊值大20,理論上應該直接建立20個核心工作執行緒
            // 而工作佇列中的任務數只有10,那麼這個時候直接建立20個新工作執行緒是沒必要的,只需要一個一個建立,在建立的過程中新的執行緒會盡量的消費工作佇列中的任務
            // 這樣就可以以一種啟發性的方式建立合適的新工作執行緒,一定程度上節約資源。後面再有新的任務提交時,再從runWorker方法中去單獨建立核心執行緒(類似惰性建立)
            while (k-- > 0 && addWorker(null, true)) {
                if (workQueue.isEmpty()) {
                    // 其它工作執行緒在迴圈的過程中也在消費工作執行緒,且使用者也可能不斷地提交任務
                    // 這是一個動態的過程,但一旦發現當前工作佇列為空則立即結束
                    break;
                }
            }
        }
    }

    /**
     * 動態更新最大執行緒數maximumPoolSize
     * */
    public void setMaximumPoolSize(int maximumPoolSize) {
        if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {
            throw new IllegalArgumentException();
        }
        this.maximumPoolSize = maximumPoolSize;
        if (workerCountOf(this.ctl.get())  > maximumPoolSize) {
            // 更新完畢後,發現當前工作執行緒數超過了指定的值
            // 喚醒所有idle執行緒,讓目前空閒的idle超時的執行緒在workerCount大於maximumPoolSize時及時銷燬
            interruptIdleWorkers();
        }
    }

目前為止,通過v1版本的MyThreadPoolExecutor原始碼,已經將jdk執行緒池ThreadPoolExecutor在RUNNING狀態下提交任務,啟動工作執行緒執行任務相關的核心邏輯講解完畢了(不考慮優雅停止)。

jdk執行緒池預設支援的四種拒絕策略

jdk執行緒池支援使用者傳入自定義的拒絕策略處理器,只需要傳入實現了RejectedExecutionHandler介面的物件就行。
而jdk在ThreadPoolExecutor中提供了預設的四種拒絕策略方便使用者使用。

  1. AbortPolicy
    拒絕接受任務時會丟擲RejectedExecutionException,能讓提交任務的一方感知到異常的策略。適用於大多數場景,也是jdk預設的拒絕策略。
  2. DiscardPolicy
    直接丟棄任務的拒絕策略。簡單的直接丟棄任務,適用於對任務執行成功率要求不高的場合
  3. DiscardOldestPolicy
    丟棄當前工作佇列中最早入隊的任務,然後將當前任務重新提交。適用於後出現的任務能夠完全代替之前任務的場合(追求最終一致性)
  4. CallerRunsPolicy
    令呼叫者執行緒自己執行所提交任務的拒絕策略。線上程池壓力過大時,讓提交任務的執行緒自己執行該任務(非同步變同步),能有效地降低執行緒池的壓力,也不會丟失任務,但可能導致整體業務吞吐量大幅降低。

上面介紹的四種jdk預設拒絕策略分別適應不同的業務場景,需要使用者仔細考慮最適合的拒絕策略。同時靈活的、基於介面的設計也開放的支援使用者去自己實現更貼合自己業務的拒絕策略處理器。

    /**
    * 預設的拒絕策略:AbortPolicy
    * */
    private static final MyRejectedExecutionHandler defaultHandler = new MyAbortPolicy();
    
    /**
     * 丟擲RejectedExecutionException的拒絕策略
     * 評價:能讓提交任務的一方感知到異常的策略,比較通用,也是jdk預設的拒絕策略
     * */
    public static class MyAbortPolicy implements MyRejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable command, MyThreadPoolExecutor executor) {
            // 直接丟擲異常
            throw new RejectedExecutionException("Task " + command.toString() +
                    " rejected from " + executor.toString());
        }
    }

    /**
     * 令呼叫者執行緒自己執行command任務的拒絕策略
     * 評價:線上程池壓力過大時,讓提交任務的執行緒自己執行該任務(非同步變同步),
     *      能夠有效地降低執行緒池的壓力,也不會丟失任務,但可能導致整體業務吞吐量大幅降低
     * */
    public static class MyCallerRunsPolicy implements MyRejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable command, MyThreadPoolExecutor executor) {
            if (!executor.isShutdown()) {
                // 如果當前執行緒池不是shutdown狀態,則令呼叫者執行緒自己執行command任務
                command.run();
            }else{
                // 如果已經是shutdown狀態了,就什麼也不做直接丟棄任務
            }
        }
    }

    /**
     * 直接丟棄任務的拒絕策略
     * 評價:簡單的直接丟棄任務,適用於對任務執行成功率要求不高的場合
     * */
    public static class MyDiscardPolicy implements MyRejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable command, MyThreadPoolExecutor executor) {
            // 什麼也不做的,直接返回
            // 效果就是command任務被無聲無息的丟棄了,沒有異常
        }
    }

    /**
     * 丟棄當前工作佇列中最早入隊的任務,然後將當前任務重新提交
     * 評價:適用於後出現的任務能夠完全代替之前任務的場合(追求最終一致性)
     * */
    public static class MyDiscardOldestPolicy implements MyRejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable command, MyThreadPoolExecutor executor) {
            if (!executor.isShutdown()) {
                // 如果當前執行緒池不是shutdown狀態,則丟棄當前工作佇列中最早入隊的任務,然後將當前任務重新提交
                executor.getQueue().poll();
                executor.execute(command);
            }else{
                // 如果已經是shutdown狀態了,就什麼也不做直接丟棄任務
            }
        }
    }

jdk預設的四種執行緒池實現

jdk中除了提供了預設的拒絕策略,還在Executors類中提供了四種基於ThreadPoolExecutor的、比較常用的執行緒池,以簡化使用者對執行緒池的使用。
這四種執行緒池可以通過Executors提供的public方法來分別建立:

newFixedThreadPool

newFixedThreadPool方法建立一個工作執行緒數量固定的執行緒池,其建立ThreadPoolExecutor時傳入的核心執行緒數corePoolSize和最大執行緒數maximumPoolSize是相等的。
因此其工作佇列傳入是一個無界的LinkedBlockingQueue,無界的工作佇列意味著永遠都不會建立新的非核心執行緒。
在預設allowCoreThreadTimeOut為false的情況下,執行緒池中的所有執行緒都是不會因為idle超時而銷燬的核心執行緒。
適用場景:由於工作執行緒數量固定,「fixedThreadPool」適用於任務流量較為穩定的場景

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
newCachedThreadPool

newCachedThreadPool方法建立一個工作執行緒數量有巨大彈性的執行緒池,其核心執行緒數corePoolSize=0而最大執行緒數maximumPoolSize為Integer.MAX_VALUE,60s的保活時間。
同時其工作佇列是SynchronousQueue,是一種佇列容量為0、無法快取任何任務的阻塞佇列(任何時候插入資料(offer)時必須有消費者執行緒消費,否則生產者執行緒將會被阻塞)。
這也意味著「cachedThreadPool」中沒有核心執行緒,所有工作執行緒在任務負載較低時都會在60s的idle後被銷燬;同時當負載較高,新任務到來時由於所有的工作執行緒都在執行其它任務,將會立即建立一個新的非核心執行緒來處理任務。
適用場景:由於可以無限制的建立新執行緒來做到及時響應任務,「cachedThreadPool」適用於任務流量較大且不穩定,對任務延遲容忍度較低的場景

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
newSingleThreadExecutor

newSingleThreadExecutor方法建立一個單執行緒的執行緒池,其核心執行緒數corePoolSize=1且最大執行緒數maximumPoolSize也為1,其工作佇列是無界佇列。
這意味著「singleThreadExecutor」中任何提交的任務都將嚴格按照先入先出的順序被執行。
適用場景:「singleThreadExecutor」適用於任務量較小、對任務延遲容忍度較高、並要求任務順序執行的場景。

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
newScheduledThreadPool

newScheduledThreadPool方法建立一個支援定時任務、延遲任務執行的執行緒池(關於jdk定時任務執行緒池ScheduledThreadPoolExecutor的工作原理會在未來的部落格中展開)
適用場景:「scheduledThreadPool」適用於需要任務定時或者延遲執行的場景。

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
        new DelayedWorkQueue());
    }
jdk預設提供的執行緒池的缺陷
  • 無論是newCachedThreadPool還是newFixedThreadPool、newSingleThreadExecutor,其設定的最大執行緒數量(Integer.MAX_VALUE)和無界的工作佇列(new LinkedBlockingQueue())都缺乏必要的限制。
    在生產環境中很容易因為任務流量過大導致建立過多的工作執行緒或令無界的工作佇列堆積大量的任務物件而耗盡CPU和記憶體等系統資源,最終導致程式崩潰。
    這也是為什麼阿里巴巴的開發規範中推薦使用更基礎的ThreadPoolExecutor建構函式來建立所需要的執行緒池。
  • 只有在瞭解ThreadPoolExecutor工作原理以及各項設定引數的具體作用後,才能根據具體的業務和硬體設定來設定最合適的引數值。

總結

  • 這篇部落格中我們首先介紹了執行緒池的基本概念,隨後在原始碼層面解析了jdk預設的執行緒池ThreadPoolExecutor在執行所提交任務的整體工作原理(RUNNING狀態),
    並在最後簡單的分析了jdk預設提供的四種拒絕策略和四種執行緒池的適用場景。
  • 希望通過這篇部落格能讓讀者更好的理解執行緒池的工作原理,並在工作中更好的使用執行緒池。關於ThreadPoolExecutor優雅停止的原理會在下一篇部落格中進行詳細的分析,盡請期待。
  • 本篇部落格的完整程式碼在我的github上:https://github.com/1399852153/Reinventing-the-wheel-for-learning(ThreadPool模組) 內容如有錯誤,還請多多指教。