【高並行】通過ThreadPoolExecutor類的原始碼深度解析執行緒池執行任務的核心流程

2022-06-16 06:01:25

核心邏輯概述

ThreadPoolExecutor是Java執行緒池中最核心的類之一,它能夠保證執行緒池按照正常的業務邏輯執行任務,並通過原子方式更新執行緒池每個階段的狀態。

ThreadPoolExecutor類中存在一個workers工作執行緒集合,使用者可以向執行緒池中新增需要執行的任務,workers集合中的工作執行緒可以直接執行任務,或者從任務佇列中獲取任務後執行。ThreadPoolExecutor類中提供了整個執行緒池從建立到執行任務,再到消亡的整個流程方法。本文,就結合ThreadPoolExecutor類的原始碼深度分析執行緒池執行任務的整體流程。

在ThreadPoolExecutor類中,執行緒池的邏輯主要體現在execute(Runnable)方法,addWorker(Runnable, boolean)方法,addWorkerFailed(Worker)方法和拒絕策略上,接下來,我們就深入分析這幾個核心方法。

execute(Runnable)方法

execute(Runnable)方法的作用是提交Runnable型別的任務到執行緒池中。我們先看下execute(Runnable)方法的原始碼,如下所示。

public void execute(Runnable command) {
	//如果提交的任務為空,則丟擲空指標異常
	if (command == null)
		throw new NullPointerException();
	//獲取執行緒池的狀態和執行緒池中執行緒的數量
	int c = ctl.get();
	//執行緒池中的執行緒數量小於corePoolSize的值
	if (workerCountOf(c) < corePoolSize) {
		//重新開啟執行緒執行任務
		if (addWorker(command, true))
			return;
		c = ctl.get();
	}
	//如果執行緒池處於RUNNING狀態,則將任務新增到阻塞佇列中
	if (isRunning(c) && workQueue.offer(command)) {
		//再次獲取執行緒池的狀態和執行緒池中執行緒的數量,用於二次檢查
		int recheck = ctl.get();
		//如果執行緒池沒有未處於RUNNING狀態,從佇列中刪除任務
		if (! isRunning(recheck) && remove(command))
			//執行拒絕策略
			reject(command);
		//如果執行緒池為空,則向執行緒池中新增一個執行緒
		else if (workerCountOf(recheck) == 0)
			addWorker(null, false);
	}
	//任務佇列已滿,則新增worker執行緒,如果新增執行緒失敗,則執行拒絕策略
	else if (!addWorker(command, false))
		reject(command);
}

整個任務的執行流程,我們可以簡化成下圖所示。

接下來,我們拆解execute(Runnable)方法,具體分析execute(Runnable)方法的執行邏輯。

(1)執行緒池中的執行緒數是否小於corePoolSize核心執行緒數,如果小於corePoolSize核心執行緒數,則向workers工作執行緒集合中新增一個核心執行緒執行任務。程式碼如下所示。

//執行緒池中的執行緒數量小於corePoolSize的值
if (workerCountOf(c) < corePoolSize) {
	//重新開啟執行緒執行任務
	if (addWorker(command, true))
		return;
	c = ctl.get();
}

(2)如果執行緒池中的執行緒數量大於corePoolSize核心執行緒數,則判斷當前執行緒池是否處於RUNNING狀態,如果處於RUNNING狀態,則新增任務到待執行的任務佇列中。注意:這裡向任務佇列新增任務時,需要判斷執行緒池是否處於RUNNING狀態,只有執行緒池處於RUNNING狀態時,才能向任務佇列新增新任務。否則,會執行拒絕策略。程式碼如下所示。

if (isRunning(c) && workQueue.offer(command)) 

(3)向任務佇列中新增任務成功,由於其他執行緒可能會修改執行緒池的狀態,所以這裡需要對執行緒池進行二次檢查,如果當前執行緒池的狀態不再是RUNNING狀態,則需要將新增的任務從任務佇列中移除,執行後續的拒絕策略。如果當前執行緒池仍然處於RUNNING狀態,則判斷執行緒池是否為空,如果執行緒池中不存在任何執行緒,則新建一個執行緒新增到執行緒池中,如下所示。

//再次獲取執行緒池的狀態和執行緒池中執行緒的數量,用於二次檢查
int recheck = ctl.get();
//如果執行緒池沒有未處於RUNNING狀態,從佇列中刪除任務
if (! isRunning(recheck) && remove(command))
	//執行拒絕策略
	reject(command);
//如果執行緒池為空,則向執行緒池中新增一個執行緒
else if (workerCountOf(recheck) == 0)
	addWorker(null, false);

(4)如果在步驟(3)中向任務佇列中新增任務失敗,則嘗試開啟新的執行緒執行任務。此時,如果執行緒池中的執行緒數量已經大於執行緒池中的最大執行緒數maximumPoolSize,則不能再啟動新執行緒。此時,表示執行緒池中的任務佇列已滿,並且執行緒池中的執行緒已滿,需要執行拒絕策略,程式碼如下所示。

//任務佇列已滿,則新增worker執行緒,如果新增執行緒失敗,則執行拒絕策略
else if (!addWorker(command, false))
	reject(command);

這裡,我們將execute(Runnable)方法拆解,結合流程圖來理解執行緒池中任務的執行流程就比較簡單了。可以這麼說,execute(Runnable)方法的邏輯基本上就是一般執行緒池的執行邏輯,理解了execute(Runnable)方法,就基本理解了執行緒池的執行邏輯。

注意:有關ScheduledThreadPoolExecutor類和ForkJoinPool類執行執行緒池的邏輯,在【高並行專題】系列文章中的後文中會詳細說明,理解了這些類的執行邏輯,就基本全面掌握了執行緒池的執行流程。

在分析execute(Runnable)方法的原始碼時,我們發現execute(Runnable)方法中多處呼叫了addWorker(Runnable, boolean)方法,接下來,我們就一起分析下addWorker(Runnable, boolean)方法的邏輯。

addWorker(Runnable, boolean)方法

總體上,addWorker(Runnable, boolean)方法可以分為三部分,第一部分是使用CAS安全的向執行緒池中新增工作執行緒;第二部分是建立新的工作執行緒;第三部分則是將任務通過安全的並行方式新增到workers中,並啟動工作執行緒執行任務。

接下來,我們看下addWorker(Runnable, boolean)方法的原始碼,如下所示。

private boolean addWorker(Runnable firstTask, boolean core) {
	//標記重試的標識
	retry:
	for (;;) {
		int c = ctl.get();
		int rs = runStateOf(c);

		// 檢查佇列是否在某些特定的條件下為空
		if (rs >= SHUTDOWN &&
			! (rs == SHUTDOWN &&
			   firstTask == null &&
			   ! workQueue.isEmpty()))
			return false;
		//下面迴圈的主要作用為通過CAS方式增加執行緒的個數
		for (;;) {
			//獲取執行緒池中的執行緒數量
			int wc = workerCountOf(c);
			//如果執行緒池中的執行緒數量超出限制,直接返回false
			if (wc >= CAPACITY ||
				wc >= (core ? corePoolSize : maximumPoolSize))
				return false;
			//通過CAS方式向執行緒池新增執行緒數量
			if (compareAndIncrementWorkerCount(c))
				//通過CAS方式保證只有一個執行緒執行成功,跳出最外層迴圈
				break retry;
			//重新獲取ctl的值
			c = ctl.get();  
			//如果CAS操作失敗了,則需要在內迴圈中重新嘗試通過CAS新增執行緒數量
			if (runStateOf(c) != rs)
				continue retry;
		}
	}
	
	//跳出最外層for迴圈,說明通過CAS新增執行緒數量成功
	//此時建立新的工作執行緒
	boolean workerStarted = false;
	boolean workerAdded = false;
	Worker w = null;
	try {
		//將執行的任務封裝成worker
		w = new Worker(firstTask);
		final Thread t = w.thread;
		if (t != null) {
			//獨佔鎖,保證操作workers時的同步
			final ReentrantLock mainLock = this.mainLock;
			mainLock.lock();
			try {
				//此處需要重新檢查執行緒池狀態
				//原因是在獲得鎖之前可能其他的執行緒改變了執行緒池的狀態
				int rs = runStateOf(ctl.get());
				
				if (rs < SHUTDOWN ||
					(rs == SHUTDOWN && firstTask == null)) {
					if (t.isAlive())
						throw new IllegalThreadStateException();
					//向worker中新增新任務
					workers.add(w);
					int s = workers.size();
					if (s > largestPoolSize)
						largestPoolSize = s;
					//將是否新增了新任務的標識設定為true
					workerAdded = true;
				}
			} finally {
				//釋放獨佔鎖
				mainLock.unlock();
			}
			//新增新任成功,則啟動執行緒執行任務
			if (workerAdded) {
				t.start();
				//將任務是否已經啟動的標識設定為true
				workerStarted = true;
			}
		}
	} finally {
		//如果任務未啟動或啟動失敗,則呼叫addWorkerFailed(Worker)方法
		if (! workerStarted)
			addWorkerFailed(w);
	}
	//返回是否啟動任務的標識
	return workerStarted;
}

乍一看,addWorker(Runnable, boolean)方法還蠻長的,這裡,我們還是將addWorker(Runnable, boolean)方法進行拆解。

(1)檢查任務佇列是否在某些特定的條件下為空,程式碼如下所示。

// 檢查佇列是否在某些特定的條件下為空
if (rs >= SHUTDOWN &&
	! (rs == SHUTDOWN &&
	   firstTask == null &&
	   ! workQueue.isEmpty()))
	return false;

(2)在通過步驟(1)的校驗後,則進入內層for迴圈,在內層for迴圈中通過CAS來增加執行緒池中的執行緒數量,如果CAS操作成功,則直接退出雙重for迴圈。如果CAS操作失敗,則檢視當前執行緒池的狀態是否發生了變化,如果執行緒池的狀態發生了變化,則通過continue關鍵字重新通過外層for迴圈校驗任務佇列,檢驗通過再次執行內層for迴圈的CAS操作。如果執行緒池的狀態沒有發生變化,此時上一次CAS操作失敗了,則繼續嘗試CAS操作。程式碼如下所示。

for (;;) {
	//獲取執行緒池中的執行緒數量
	int wc = workerCountOf(c);
	//如果執行緒池中的執行緒數量超出限制,直接返回false
	if (wc >= CAPACITY ||
		wc >= (core ? corePoolSize : maximumPoolSize))
		return false;
	//通過CAS方式向執行緒池新增執行緒數量
	if (compareAndIncrementWorkerCount(c))
		//通過CAS方式保證只有一個執行緒執行成功,跳出最外層迴圈
		break retry;
	//重新獲取ctl的值
	c = ctl.get();  
	//如果CAS操作失敗了,則需要在內迴圈中重新嘗試通過CAS新增執行緒數量
	if (runStateOf(c) != rs)
		continue retry;
}

(3)CAS操作成功後,表示向執行緒池中成功新增了工作執行緒,此時,還沒有執行緒去執行任務。使用全域性的獨佔鎖mainLock來將新增的工作執行緒Worker物件安全的新增到workers中。

總體邏輯就是:建立新的Worker物件,並獲取Worker物件中的執行執行緒,如果執行緒不為空,則獲取獨佔鎖,獲取鎖成功後,再次檢查線執行緒的狀態,這是避免在獲取獨佔鎖之前其他執行緒修改了執行緒池的狀態,或者關閉了執行緒池。如果執行緒池關閉,則需要釋放鎖。否則將新增加的執行緒新增到工作集合中,釋放鎖並啟動執行緒執行任務。將是否啟動執行緒的標識設定為true。最後,判斷執行緒是否啟動,如果沒有啟動,則呼叫addWorkerFailed(Worker)方法。最終返回執行緒是否起送的標識。

//跳出最外層for迴圈,說明通過CAS新增執行緒數量成功
//此時建立新的工作執行緒
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
	//將執行的任務封裝成worker
	w = new Worker(firstTask);
	final Thread t = w.thread;
	if (t != null) {
		//獨佔鎖,保證操作workers時的同步
		final ReentrantLock mainLock = this.mainLock;
		mainLock.lock();
		try {
			//此處需要重新檢查執行緒池狀態
			//原因是在獲得鎖之前可能其他的執行緒改變了執行緒池的狀態
			int rs = runStateOf(ctl.get());
			
			if (rs < SHUTDOWN ||
				(rs == SHUTDOWN && firstTask == null)) {
				if (t.isAlive())
					throw new IllegalThreadStateException();
				//向worker中新增新任務
				workers.add(w);
				int s = workers.size();
				if (s > largestPoolSize)
					largestPoolSize = s;
				//將是否新增了新任務的標識設定為true
				workerAdded = true;
			}
		} finally {
			//釋放獨佔鎖
			mainLock.unlock();
		}
		//新增新任成功,則啟動執行緒執行任務
		if (workerAdded) {
			t.start();
			//將任務是否已經啟動的標識設定為true
			workerStarted = true;
		}
	}
} finally {
	//如果任務未啟動或啟動失敗,則呼叫addWorkerFailed(Worker)方法
	if (! workerStarted)
		addWorkerFailed(w);
}
//返回是否啟動任務的標識
return workerStarted;

addWorkerFailed(Worker)方法

在addWorker(Runnable, boolean)方法中,如果新增工作執行緒失敗或者工作執行緒啟動失敗時,則會呼叫addWorkerFailed(Worker)方法,下面我們就來看看addWorkerFailed(Worker)方法的實現,如下所示。

private void addWorkerFailed(Worker w) {
	//獲取獨佔鎖
	final ReentrantLock mainLock = this.mainLock;
	mainLock.lock();
	try {
		//如果Worker任務不為空
		if (w != null)
			//將任務從workers集合中移除
			workers.remove(w);
		//通過CAS將任務數量減1
		decrementWorkerCount();
		tryTerminate();
	} finally {
		//釋放鎖
		mainLock.unlock();
	}
}

addWorkerFailed(Worker)方法的邏輯就比較簡單了,獲取獨佔鎖,將任務從workers中移除,並且通過CAS將任務的數量減1,最後釋放鎖。

拒絕策略

我們在分析execute(Runnable)方法時,執行緒池會在適當的時候呼叫reject(Runnable)方法來執行相應的拒絕策略,我們看下reject(Runnable)方法的實現,如下所示。

final void reject(Runnable command) {
	handler.rejectedExecution(command, this);
}

通過程式碼,我們發現呼叫的是handler的rejectedExecution方法,handler又是個什麼鬼,我們繼續跟程序式碼,如下所示。

private volatile RejectedExecutionHandler handler;

再看看RejectedExecutionHandler是個啥型別,如下所示。

package java.util.concurrent;

public interface RejectedExecutionHandler {

    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

可以發現RejectedExecutionHandler是個介面,定義了一個rejectedExecution(Runnable, ThreadPoolExecutor)方法。既然RejectedExecutionHandler是個介面,那我們就看看有哪些類實現了RejectedExecutionHandler介面。

看到這裡,我們發現RejectedExecutionHandler介面的實現類正是執行緒池預設提供的四種拒絕策略的實現類。

至於reject(Runnable)方法中具體會執行哪個類的拒絕策略,是根據建立執行緒池時傳遞的引數決定的。如果沒有傳遞拒絕策略,則預設會執行AbortPolicy類的拒絕策略。否則會執行傳遞的類的拒絕策略。

在建立執行緒池時,除了能夠傳遞JDK預設提供的拒絕策略外,還可以傳遞自定義的拒絕策略。如果想使用自定義的拒絕策略,則只需要實現RejectedExecutionHandler介面,並重寫rejectedExecution(Runnable, ThreadPoolExecutor)方法即可。例如,下面的程式碼。

public class CustomPolicy implements RejectedExecutionHandler {

	public CustomPolicy() { }

	public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
		if (!e.isShutdown()) {
			System.out.println("使用呼叫者所在的執行緒來執行任務")
			r.run();
		}
	}
}

使用如下方式建立執行緒池。

new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                       60L, TimeUnit.SECONDS,
                       new SynchronousQueue<Runnable>(),
                       Executors.defaultThreadFactory(),
		       new CustomPolicy());

至此,執行緒池執行任務的整體核心邏輯分析結束。

好了,今天就到這兒吧,我是冰河,我們下期見~~