【Java執行緒池】 java.util.concurrent.ThreadPoolExecutor 原始碼分析

2022-07-30 18:02:09

執行緒池概述

執行緒池,是指管理一組同構工作執行緒的資源池。
執行緒池在工作佇列(Work Queue)中儲存了所有等待執行的任務。工作者執行緒(Work Thread)會從工作佇列中獲取一個任務並執行,然後返回執行緒池並等待下一個任務。

執行緒池比執行任務再建立執行緒會有以下優勢:

  1. 節省資源。通過重用執行緒來省去線上程建立和銷燬過程中產生的開銷。
  2. 提高響應性。當執行請求到達時,工作執行緒通常已經存在,因此不需要等待執行緒建立,從而提高了響應性。
  3. 防止資源耗盡。通過調整執行緒池的大小,防止過多執行緒相互競爭資源。

ThreadPoolExecutor 是Java中執行緒池的實現類。下圖是繼承關係:

classDiagram direction BT class AbstractExecutorService class Executor { <<Interface>> } class ExecutorService { <<Interface>> } class ThreadPoolExecutor AbstractExecutorService ..> ExecutorService ExecutorService --> Executor ThreadPoolExecutor --> AbstractExecutorService
  • Executor ,基於生產者-消費者模式,提交相當於生產者,執行任務相當於消費者。通過該模式將任務的提交與執行解耦開來。
  • ExecutorService,是Executor 的實現,該介面基於Executor 提供了生命週期的支援,例如任務的執行、關閉和中止,以及通過 submit 方法來建立一個非同步任務 Future
  • AbstractExecutorService,是ExecutorService 的實現,使得下層實現只需要關注任務的執行。
  • ThreadPoolExecutor,是Java中執行緒池的實現。

下圖是ThreadPoolExecutor的大致結構

> 參考 [Java執行緒池實現原理及其在美團業務中的實踐](https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html)

生命週期

Executor 的實現通常會建立執行緒來執行任務。由於 Executor 以非同步的方式來執行任務,因此之前提交任務的狀態不是立即可見的,有些任務已經完成,有些還在執行,有些在等待執行。當關閉應用程式時,是完成當前任務並不接受新任務,還是直接關閉所有任務(不管是在執行還是沒有執行)。
為了解決執行服務的生命週期問題,Executor 擴充套件了 ExecutorService 介面,該介面提供了對生命週期管理的方法。

public interface ExecutorService extends Executor {
	void shutdown();
	List<Runnable> shutdownNow();
	boolean awaitTermination(long timeout, TimeUnit unit);
	boolean isShutdown();
	boolean isTerminated();
	
	// 以及一些用於建立非同步任務的方法
	<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks);
	Future<?> submit(Runnable task);
	....	
}

在ThreadPoolExecutor中提供了五種狀態:

狀態 描述 轉換
RUNTIME 接受新任務並處理佇列任務
SHUTDOWN 不接受新任務,但是處理佇列中任務 RUNTIME->SHUTDOWN:執行 shutdown()
STOP 不接受新任務,不處理佇列中任務,並中斷正在執行中的任務 RUNTIME->STOP:執行 shutdownNow()
TIDYING 所有任務已經終止,工作者執行緒為0,之後會執行一個勾點函數(TERMINATED())用於清理 SHUTDOWN->TIDYING:佇列和執行緒池都為空
STOP->TIDYING: 執行緒池為空
TERMINATED 勾點函數執行完畢 TIDYING->TERMINATED

下圖是狀態之間的轉換關係:

如果任務佇列被填滿(在佇列大小有限的情況下)或者某個任務被提交到一個已經被關閉的Executor中時應該怎麼處理這些情況?JDK提供了一種策略來處理這些情況--飽和策略

ThreadPoolExecutor中通過ctl欄位來維護了執行緒池的執行狀態和執行緒數量(工作者執行緒)

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

具體的可以通過兩個引數來說明:

  • COUNT_BIT的值為29(32-3)
  • COUNT_MASK 則為高三位為0、低29位全1的欄位
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

每個狀態的取值如下,每個取值都向左邊移動了29位:

private static final int RUNNING    = -1 << COUNT_BITS;  
private static final int SHUTDOWN   =  0 << COUNT_BITS;  
private static final int STOP       =  1 << COUNT_BITS;  
private static final int TIDYING    =  2 << COUNT_BITS;  
private static final int TERMINATED =  3 << COUNT_BITS;

所以高三位用來表示執行緒池的狀態
之後用來確定執行緒池中的執行緒數量都使用COUNT_MASK來計算,這樣就能計算低29位

private static int workerCountOf(int c)  { return c & COUNT_MASK; }

ThreadPoolExecutor 通過如下的方法來檢測當前執行緒池的狀態:
執行是要比終結狀態小的

// 當前狀態要比給定的狀態小,如 running < terminated
private static boolean runStateLessThan(int c, int s) {  
    return c < s;  
}  
// 當前狀態要高於給定的狀態,如 terminated >= running
private static boolean runStateAtLeast(int c, int s) {  
    return c >= s;  
}  
// 檢測當前執行緒是否在執行
private static boolean isRunning(int c) {  
    return c < SHUTDOWN;  
}

飽和(拒絕)策略

ThreadPoolExecutor的飽和策略可以通過呼叫 setRejectedtExecutionHandler 來修改。
JDK 提供了幾種不同的 RejectedExecutionHandler 的實現:

名稱 描述
AbortPolicy 預設的飽和策略,會丟擲一個 RejectedExecutionException
可以捕獲這個異常然後進行處理
CallerRunsPolicy 該策略實現了一種調節機制,不會拋棄任務也不會丟擲異常。
而是將該任務在呼叫者執行緒中執行,使得呼叫者需要執行完該任務才能繼續提交任務。
這樣使得工作者執行緒有時間來處理正在執行的任務。
DiscardPolicy 會拋棄任務,不會提醒任務被丟擲
DiscardOldestPolicy 拋棄下一個將被執行的任務並將該任務重新提交

佇列任務管理

如果請求速率超過了執行緒池的處理速率,那麼新到來的請求將會累計起來,這些請求會在一個由 Executor 管理的 Runnable 佇列(也就是任務佇列 Work Queue)中等待。但這仍有可能超出快取的數量。
基本的任務排隊方法有三種:

  • 有界佇列:有長度限制
  • 無界佇列:沒有長度限制
  • 同步移交:立即將元素傳輸給正在等待的消費者
    BlockingQueueThreadPoolExecutor 的任務佇列:
public interface BlockingQueue<E> extends Queue<E> {
	offer();
	pool();
	put();
	take();
	....
}

JDK 提供了以下幾種實現類:

類名 描述 型別
LinkedBlockingQueue 由連結串列組成,此佇列按照FIFO對元素進行排序。預設長度為 Integer.MAX_VALUE 有界阻塞佇列
PriorityBlockingQueue 優先佇列,預設按從小到大(自然序)進行排序。不能保證同優先順序元素的順序 無界阻塞佇列
ArrayBlockingQueue 由陣列組成,按照FIFO對元素進行排序。支援公平鎖和非公平鎖,預設使用非公平鎖 有界阻塞佇列
SynchronousQueue 不是真正的佇列,是一種特殊的阻塞佇列,沒有實際的容量,任意執行緒都會等待直到獲得資料或者交付完成才會返回 無界阻塞同步移交
DelayQueue 實現了PriorityBlockingQueue的延遲的無界佇列,指定多久能從佇列中獲取元素。 無界佇列
LinkedTransferQueue 多實現了一個 TransferQueue 介面,支援將元素移交給正在等待的消費者 無界佇列
LinkedBlockingDeque 由連結串列組成的雙向阻塞佇列,得益於雙向佇列的特性,多執行緒並行時,可以將鎖的競爭最多降到一半 阻塞佇列

工作者執行緒

所有的任務都是由工作者執行緒來執行的,那麼工作者執行緒是如何執行這些任務的,以及執行緒池是如何維護工作者執行緒的。下面是ThreadPoolExecutor中的工作者執行緒類Worker

private final class Worker  
    extends AbstractQueuedSynchronizer  
    implements Runnable  
{
	// 執行緒
	final Thread thread;
	// 第一個任務
	Runnable firstTask;
	// 執行任務的數量
	volatile long completedTasks;
	....
}

工作者執行緒實現了Runnable介面,並且它包含了一個Thread ,說明工作者執行緒是一個特殊的任務也是一個執行緒,它可以去執行一些其他任務也可以自控制。

工作者執行緒繼承自AQS,而不是使用的可重入鎖ReentrantLock,目的是實現可不重入的特性(執行緒封閉)。每個工作者執行緒必須是執行緒封閉的。
在工作者執行緒構造器中有一段:

this.thread = getThreadFactory().newThread(this);

通過呼叫執行緒工廠建立執行緒並將this(也就是工作者執行緒)提交給Thread
也就是執行Thread就啟動了自己,所以工作者執行緒可以自己管理自己
addWorkder中建立工作者執行緒之後執行了它,這裡的t就是thread

t.start();  

執行緒池中的工作者執行緒的狀態由兩個欄位來控制:

// 生存時間
private volatile long keepAliveTime;
// 是否允許核心執行緒超時等待
private volatile boolean allowCoreThreadTimeOut;

keepAliveTime 是當執行緒數量大於核心執行緒數數量時工作者執行緒沒有任務時存活的時間
例如當前工作者執行緒數量是30,核心執行緒數量上限是20,最大執行緒數量是30。那麼多出來10個執行緒線上程池比較空閒的時候是需要清除的,因為這是佔用了多餘的系統資源。
keepAliveTime是為了保證突然之間執行緒池繁忙的情況,這時候就沒必要立馬清除這些執行緒,可以"等等看"有沒有突發情況。

allowCoreThreadTimeOut 則是使得核心執行緒也受keepAliveTime的影響
這些具體體現在從佇列中獲取任務的時候,下面會詳細描述

執行緒工廠

每當執行緒池建立一個執行緒,都是通過執行緒的工廠方法建立的。
預設的執行緒工廠方法建立一個新的、非守護的執行緒,並且不包含特殊的設定資訊。通過制定一個工廠方法,可以客製化執行緒池的設定資訊。每當執行緒池需要一個新的執行緒都會呼叫getThreadFactory()這個方法。

以下是ThreadPoolExecutor 建立一個工作者執行緒,是通過工廠方法建立的:

Worker(Runnable firstTask) {  
    setState(-1); // inhibit interrupts until runWorker  
    this.firstTask = firstTask;  
    this.thread = getThreadFactory().newThread(this);  
}

ThreadFactory 是一個介面

public interface ThreadFactory {  
  Thread newThread(Runnable r);  
}

在構造執行緒池ThreadPoolExecutor時,可以傳入一個執行緒工廠,使得建立執行緒時通過該執行緒工廠建立。

執行一個任務

ThreadPoolExecutor 是通過實現Executor來執行任務的
具體分為三個步驟:

  1. 如果當前正在執行的執行緒小於corePoolSize則建立一個工作者執行緒並將該任務作為該執行緒的第一個任務執行
  2. 如果當前任務能夠進入任務佇列,仍需要檢查執行緒池的執行狀態。如果執行緒池關閉則需要將任務交給飽和策略處理。如果沒有關閉並且工作者執行緒為0,則需要建立工作者執行緒(這個時候任務已經在佇列中)。
  3. 如果不能夠進入佇列,則嘗試建立工作者執行緒去處理任務;如果失敗則說明已經飽和,則將任務交給飽和(拒絕)策略處理。

copy from Java執行緒池實現原理及其在美團業務中的實踐

通過原始碼來理解一下:

public void execute(Runnable command) {  
	// 任務不能為空
    if (command == null)  
        throw new NullPointerException();
	// 任務數量
	int c = ctl.get();  
    if (workerCountOf(c) < corePoolSize) {  
        if (addWorker(command, true))  
            return;  
        c = ctl.get();  
    }  
    if (isRunning(c) && workQueue.offer(command)) {  
        int recheck = ctl.get();  
        if (! isRunning(recheck) && remove(command))  
            reject(command);  
        else if (workerCountOf(recheck) == 0)  
            addWorker(null, false);  
    }  
    else if (!addWorker(command, false))  
        reject(command);  
}

當執行一個任務會將一個任務放到工作佇列中或者是直接建立一個工作者執行緒去執行該任務。
addWorker 是建立一個工作者執行緒並執行一個任務的(可以不執行一個任務)
這段程式碼主要做兩個工作:

  1. 保證能正常新增工作者執行緒,數量不能超過設定的範圍並且執行緒池沒有關閉。
  2. 向工作者執行緒組中新增一個工作者執行緒並且執行緒池在正常執行,之後執行該執行緒去執行任務。
private boolean addWorker(Runnable firstTask, boolean core) {  
	// 保證能新增工作者執行緒
    retry:  
    for (int c = ctl.get();;) {  
        // Check if queue empty only if necessary.  
        if (runStateAtLeast(c, SHUTDOWN)  
            && (runStateAtLeast(c, STOP)  
                || firstTask != null  
                || workQueue.isEmpty()))  
            return false;  
  
        for (;;) {
	        // 如果超過限定數量,這個數量可以是最小的活躍執行緒數量可以是最大的活躍執行緒數量  
            if (workerCountOf(c)  
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;  
            if (compareAndIncrementWorkerCount(c))  
	            // 只有能增加工作者執行緒才退出
                break retry;  
            //重新獲取並檢查執行狀態
            c = ctl.get();  // Re-read ctl  
            if (runStateAtLeast(c, SHUTDOWN))  
                continue retry;  
            // else CAS failed due to workerCount change; retry inner loop  
        }  
    }  
	// 從這裡建立一個工作者執行緒並執行該執行緒
    boolean workerStarted = false;  
    boolean workerAdded = false;  
    Worker w = null;  
    try {  
        w = new Worker(firstTask);  
        final Thread t = w.thread;  
        if (t != null) {  
            final ReentrantLock mainLock = this.mainLock;  
            mainLock.lock();  
            try {  
	            // 檢測執行緒池執行狀態
                if (isRunning(c) ||  
                    (runStateLessThan(c, STOP) && firstTask == null)) {
                    // 檢測執行緒狀態,執行緒狀態必須為 NEW
                    if (t.getState() != Thread.State.NEW)  
                        throw new IllegalThreadStateException();  
                    // 將該工作者執行緒新增到工作者執行緒組中
                    workers.add(w);  
                    workerAdded = true;  
                    int s = workers.size();  
                    if (s > largestPoolSize)  
                        largestPoolSize = s;  
                }  
            } finally {  
                mainLock.unlock();  
            }  
            // 如果工作者執行緒已經新增則執行該執行緒
            if (workerAdded) {  
                t.start();  
                workerStarted = true;  
            }  
        }  
    } finally {  
        if (! workerStarted)  
            addWorkerFailed(w);  
    }  
    return workerStarted;  
}

獲取並執行任務

工作者執行緒獲取任務主要有兩個途徑:

  1. 通過建立工作者執行緒時給予的
  2. 從任務佇列中獲取的
    期間會一直檢查執行緒池的狀態。如果遇到執行緒池停止了需要去確保當前執行緒的中斷狀態
    如果執行緒池沒有終止也需要確保執行緒能夠正常執行
final void runWorker(Worker w) {  
    Thread wt = Thread.currentThread();  
    Runnable task = w.firstTask;  
    w.firstTask = null;  
    w.unlock(); // allow interrupts  
    boolean completedAbruptly = true;  
    try {  
	    // 有兩種情況
	    // 1. 執行firstTask
	    // 2. 獲取一個任務並執行
        while (task != null || (task = getTask()) != null) {  
            w.lock();
            // 如果執行緒池已經停止,確保當前執行緒是中斷的
            // 如果執行緒池正在執行,確保執行緒沒有中斷
	        // 第二次獲取控制狀態是為了確保執行緒池在關閉的過程中能夠正常中斷當前執行緒
			if ((runStateAtLeast(ctl.get(), STOP) ||  
				 (Thread.interrupted() &&  
				  runStateAtLeast(ctl.get(), STOP))) &&  
				!wt.isInterrupted())
				wt.interrupt();
			// 從這裡開始執行任務
            try {  
                beforeExecute(wt, task);  
                try {  
                    task.run();  
                    afterExecute(task, null);  
                } catch (Throwable ex) {  
                    afterExecute(task, ex);  
                    throw ex;  
                }  
            } finally {  
                task = null;
                // 增加任務執行數量,這裡是執行緒封閉的,所以不需要考慮並行的情況  
                w.completedTasks++;  
                w.unlock();  
            }  
        }  
        completedAbruptly = false;  
    } finally {  
	    // 執行緒自回收
        processWorkerExit(w, completedAbruptly);  
    }  
}

從任務佇列中獲取任務

工作者執行緒通過 getTask獲取一個任務來執行

private Runnable getTask() {  
    boolean timedOut = false; // Did the last poll() time out?  
  
    for (;;) {  
        int c = ctl.get();  
  
        // Check if queue empty only if necessary.  
        // 檢測執行緒池狀態,佇列不能為空
        if (runStateAtLeast(c, SHUTDOWN)  
            && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {  
            decrementWorkerCount();  
            return null;  
        }  
		// 
        int wc = workerCountOf(c);  
  
        // 這一段用來表示核心執行緒是否受影響
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;  
		// 工作者執行緒太多或者任務佇列為空
        if ((wc > maximumPoolSize || (timed && timedOut))  
            && (wc > 1 || workQueue.isEmpty())) {  
            // 減少工作者執行緒數量
            if (compareAndDecrementWorkerCount(c))  
                return null;  
            continue;  
        }  
		// 從佇列中獲取一個任務
        try {  
	        // 這一段體現了keepAliveTime的作用
	        // 超過keepAliveTime給定時間沒有獲取到任務,那麼執行緒將會被清理/回收掉
            Runnable r = timed ?  
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :  
                workQueue.take();  
            if (r != null)  
                return r;  
            timedOut = true;  
        } catch (InterruptedException retry) {  
            timedOut = false;  
        }  
    }  
}

工作者執行緒回收

processWorkerExit 負責這一工作,具體流程如下:

  1. 從工作者執行緒集合中移除
  2. 嘗試終止執行緒池
  3. 嘗試通過建立一個工作者執行緒來替換當前執行緒,這種情況可能由以下原因
    1. 當前被清除的執行緒可能由任務異常而退出
    2. 沒有工作者執行緒執行任務
    3. 佇列中沒有任務
private void processWorkerExit(Worker w, boolean completedAbruptly) {  
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted  
        decrementWorkerCount();  
  
    final ReentrantLock mainLock = this.mainLock;  
    mainLock.lock();  
    // 將工作者執行緒從集合中移除
    try {  
	    // 彙總執行完畢的任務數量
        completedTaskCount += w.completedTasks;  
        workers.remove(w);  
    } finally {  
        mainLock.unlock();  
    }  
	// 嘗試終止執行緒池
    tryTerminate();  
	// 3. 嘗試建立工作者執行緒
    int c = ctl.get();  
    if (runStateLessThan(c, STOP)) {  
        if (!completedAbruptly) {  
		    // 活躍的執行緒數量,其他的工作者執行緒可能超時回收了
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;  
            // 活躍的執行緒數量為0並且任務佇列為空
            if (min == 0 && ! workQueue.isEmpty())
			// 當前的活躍執行緒數量
                min = 1;
            // 實際的活躍執行緒數量要大於預測的就沒必要建立,因為執行緒數量夠用
            if (workerCountOf(c) >= min)  
                return; // replacement not needed  
        }  
        // 否則建立一個工作者執行緒來替代當前執行緒
        addWorker(null, false);  
    }  
}

參考

Java 並行程式設計實戰
Java執行緒池實現原理及其在美團業務中的實踐