【高並行】通過原始碼深度分析執行緒池中Worker執行緒的執行流程

2022-07-25 12:01:21

大家好,我是冰河~~

在《高並行之——通過ThreadPoolExecutor類的原始碼深度解析執行緒池執行任務的核心流程》一文中我們深度分析了執行緒池執行任務的核心流程,在ThreadPoolExecutor類的addWorker(Runnable, boolean)方法中,使用CAS安全的更新執行緒的數量之後,接下來就是建立新的Worker執行緒執行任務,所以,我們先來分析下Worker類的原始碼。

Worker類分析

Worker類從類的結構上來看,繼承了AQS(AbstractQueuedSynchronizer類)並實現了Runnable介面。本質上,Worker類既是一個同步元件,也是一個執行任務的執行緒。接下來,我們看下Worker類的原始碼,如下所示。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
	private static final long serialVersionUID = 6138294804551838833L;
	//執行任務的執行緒類
	final Thread thread;
	//初始化執行的任務,第一次執行的任務
	Runnable firstTask;
	//完成任務的計數
	volatile long completedTasks;
	//Worker類的構造方法,初始化任務並呼叫執行緒工廠建立執行任務的執行緒
	Worker(Runnable firstTask) {
		setState(-1); 
		this.firstTask = firstTask;
		this.thread = getThreadFactory().newThread(this);
	}
	//重寫Runnable介面的run()方法
	public void run() {
		//呼叫ThreadPoolExecutor類的runWorker(Worker)方法
		runWorker(this);
	}

	//檢測是否是否獲取到鎖
	//state=0表示未獲取到鎖
	//state=1表示已獲取到鎖
	protected boolean isHeldExclusively() {
		return getState() != 0;
	}
	
	//使用AQS設定執行緒狀態
	protected boolean tryAcquire(int unused) {
		if (compareAndSetState(0, 1)) {
			setExclusiveOwnerThread(Thread.currentThread());
			return true;
		}
		return false;
	}

	//嘗試釋放鎖
	protected boolean tryRelease(int unused) {
		setExclusiveOwnerThread(null);
		setState(0);
		return true;
	}

	public void lock()        { acquire(1); }
	public boolean tryLock()  { return tryAcquire(1); }
	public void unlock()      { release(1); }
	public boolean isLocked() { return isHeldExclusively(); }

	void interruptIfStarted() {
		Thread t;
		if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
			try {
				t.interrupt();
			} catch (SecurityException ignore) {
			}
		}
	}
}

在Worker類的構造方法中,可以看出,首先將同步狀態state設定為-1,設定為-1是為了防止runWorker方法執行之前被中斷。這是因為如果其他執行緒呼叫執行緒池的shutdownNow()方法時,如果Worker類中的state狀態的值大於0,則會中斷執行緒,如果state狀態的值為-1,則不會中斷執行緒。

Worker類實現了Runnable介面,需要重寫run方法,而Worker的run方法本質上呼叫的是ThreadPoolExecutor類的runWorker方法,在runWorker方法中,會首先呼叫unlock方法,該方法會將state置為0,所以這個時候呼叫shutDownNow方法就會中斷當前執行緒,而這個時候已經進入了runWork方法,就不會在還沒有執行runWorker方法的時候就中斷執行緒。

注意:大家需要重點理解Worker類的實現。

Worker類中呼叫了ThreadPoolExecutor類的runWorker(Worker)方法。接下來,我們一起看下ThreadPoolExecutor類的runWorker(Worker)方法的實現。

runWorker(Worker)方法

首先,我們看下RunWorker(Worker)方法的原始碼,如下所示。

final void runWorker(Worker w) {
	Thread wt = Thread.currentThread();
	Runnable task = w.firstTask;
	w.firstTask = null;
	//釋放鎖,將state設定為0,允許中斷任務的執行
	w.unlock();
	boolean completedAbruptly = true;
	try {
		//如果任務不為空,或者從任務佇列中獲取的任務不為空,則執行while迴圈
		while (task != null || (task = getTask()) != null) {
			//如果任務不為空,則獲取Worker工作執行緒的獨佔鎖
			w.lock();
			//如果執行緒已經停止,或者中斷執行緒後執行緒終止並且沒有成功中斷執行緒
			//大家好好理解下這個邏輯
			if ((runStateAtLeast(ctl.get(), STOP) ||
				 (Thread.interrupted() &&
				  runStateAtLeast(ctl.get(), STOP))) &&
				!wt.isInterrupted())
				//中斷執行緒
				wt.interrupt();
			try {
				//執行任務前執行的邏輯
				beforeExecute(wt, task);
				Throwable thrown = null;
				try {
					//呼叫Runable介面的run方法執行任務
					task.run();
				} catch (RuntimeException x) {
					thrown = x; throw x;
				} catch (Error x) {
					thrown = x; throw x;
				} catch (Throwable x) {
					thrown = x; throw new Error(x);
				} finally {
					//執行任務後執行的邏輯
					afterExecute(task, thrown);
				}
			} finally {
				//任務執行完成後,將其設定為空
				task = null;
				//完成的任務數量加1
				w.completedTasks++;
				//釋放工作執行緒獲得的鎖
				w.unlock();
			}
		}
		completedAbruptly = false;
	} finally {
		//執行退出Worker執行緒的邏輯
		processWorkerExit(w, completedAbruptly);
	}
}

這裡,我們拆解runWorker(Worker)方法。

(1)獲取當前執行緒的控制程式碼和工作執行緒中的任務,並將工作執行緒中的任務設定為空,執行unlock方法釋放鎖,將state狀態設定為0,此時可以中斷工作執行緒,程式碼如下所示。

Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//釋放鎖,將state設定為0,允許中斷任務的執行
w.unlock();

(2)在while迴圈中進行判斷,如果任務不為空,或者從任務佇列中獲取的任務不為空,則執行while迴圈,否則,呼叫processWorkerExit(Worker, boolean)方法退出Worker工作執行緒。

while (task != null || (task = getTask()) != null)

(3)如果滿足while的迴圈條件,首先獲取工作執行緒內部的獨佔鎖,並執行一系列的邏輯判斷來檢測是否需要中斷當前執行緒的執行,程式碼如下所示。

//如果任務不為空,則獲取Worker工作執行緒的獨佔鎖
w.lock();
//如果執行緒已經停止,或者中斷執行緒後執行緒終止並且沒有成功中斷執行緒
//大家好好理解下這個邏輯
if ((runStateAtLeast(ctl.get(), STOP) ||
	 (Thread.interrupted() &&
	  runStateAtLeast(ctl.get(), STOP))) &&
	!wt.isInterrupted())
	//中斷執行緒
	wt.interrupt();

(4)呼叫執行任務前執行的邏輯,如下所示

//執行任務前執行的邏輯
beforeExecute(wt, task);

(5)呼叫Runable介面的run方法執行任務

//呼叫Runable介面的run方法執行任務
task.run();

(6)呼叫執行任務後執行的邏輯

//執行任務後執行的邏輯
afterExecute(task, thrown);

(7)將完成的任務設定為空,完成的任務數量加1並釋放工作執行緒的鎖。

//任務執行完成後,將其設定為空
task = null;
//完成的任務數量加1
w.completedTasks++;
//釋放工作執行緒獲得的鎖
w.unlock();

(8)退出Worker執行緒的執行,如下所示

//執行退出Worker執行緒的邏輯
processWorkerExit(w, completedAbruptly);

從程式碼分析上可以看到,當從Worker執行緒中獲取的任務為空時,會呼叫getTask()方法從任務佇列中獲取任務,接下來,我們看下getTask()方法的實現。

getTask()方法

我們先來看下getTask()方法的原始碼,如下所示。

private Runnable getTask() {
	//輪詢是否超時的標識
	boolean timedOut = false;
	//自旋for迴圈
	for (;;) {
		//獲取ctl
		int c = ctl.get();
		//獲取執行緒池的狀態
		int rs = runStateOf(c);

		//檢測任務佇列是否線上程池停止或關閉的時候為空
		//也就是說任務佇列是否線上程池未正常執行時為空
		if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
			//減少Worker執行緒的數量
			decrementWorkerCount();
			return null;
		}
		//獲取執行緒池中執行緒的數量
		int wc = workerCountOf(c);

		//檢測當前執行緒池中的執行緒數量是否大於corePoolSize的值或者是否正在等待執行任務
		boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

		//如果執行緒池中的執行緒數量大於corePoolSize
		//獲取大於corePoolSize或者是否正在等待執行任務並且輪詢超時
		//並且當前執行緒池中的執行緒數量大於1或者任務佇列為空
		if ((wc > maximumPoolSize || (timed && timedOut))
			&& (wc > 1 || workQueue.isEmpty())) {
			//成功減少執行緒池中的工作執行緒數量
			if (compareAndDecrementWorkerCount(c))
				return null;
			continue;
		}

		try {
			//從任務佇列中獲取任務
			Runnable r = timed ?
				workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
				workQueue.take();
			//任務不為空直接返回任務
			if (r != null)
				return r;
			timedOut = true;
		} catch (InterruptedException retry) {
			timedOut = false;
		}
	}
}

getTask()方法的邏輯比較簡單,大家看原始碼就可以了,我這裡就不重複描述了。

接下來,我們看下在正式呼叫Runnable的run()方法前後,執行的beforeExecute方法和afterExecute方法。

beforeExecute(Thread, Runnable)方法

beforeExecute(Thread, Runnable)方法的原始碼如下所示。

protected void beforeExecute(Thread t, Runnable r) { }

可以看到,beforeExecute(Thread, Runnable)方法的方法體為空,我們可以建立ThreadPoolExecutor的子類來重寫beforeExecute(Thread, Runnable)方法,使得執行緒池正式執行任務之前,執行我們自己定義的業務邏輯。

afterExecute(Runnable, Throwable)方法

afterExecute(Runnable, Throwable)方法的原始碼如下所示。

protected void afterExecute(Runnable r, Throwable t) { }

可以看到,afterExecute(Runnable, Throwable)方法的方法體同樣為空,我們可以建立ThreadPoolExecutor的子類來重寫afterExecute(Runnable, Throwable)方法,使得執行緒池在執行任務之後執行我們自己定義的業務邏輯。

接下來,就是退出工作執行緒的processWorkerExit(Worker, boolean)方法。

processWorkerExit(Worker, boolean)方法

processWorkerExit(Worker, boolean)方法的邏輯主要是執行退出Worker執行緒,並且對一些資源進行清理,原始碼如下所示。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
	//執行過程中出現了異常,突然中斷
	if (completedAbruptly)
		//將工作執行緒的數量減1
		decrementWorkerCount();
	//獲取全域性鎖
	final ReentrantLock mainLock = this.mainLock;
	mainLock.lock();
	try {
		//累加完成的任務數量
		completedTaskCount += w.completedTasks;
		//將完成的任務從workers集合中移除
		workers.remove(w);
	} finally {
		//釋放鎖
		mainLock.unlock();
	}
	//嘗試終止工作執行緒的執行
	tryTerminate();
	//獲取ctl
	int c = ctl.get();
	//判斷當前執行緒池的狀態是否小於STOP(RUNNING或者SHUTDOWN)
	if (runStateLessThan(c, STOP)) {
		//如果沒有突然中斷完成
		if (!completedAbruptly) {
			//如果allowCoreThreadTimeOut為true,為min賦值為0,否則賦值為corePoolSize
			int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
			//如果min為0並且工作佇列不為空
			if (min == 0 && ! workQueue.isEmpty())
				//min的值設定為1
				min = 1;
			//如果執行緒池中的執行緒數量大於min的值
			if (workerCountOf(c) >= min)
				//返回,不再執行程式
				return; 
		}
		//呼叫addWorker方法
		addWorker(null, false);
	}
}

接下來,我們拆解processWorkerExit(Worker, boolean)方法。

(1)執行過程中出現了異常,突然中斷執行,則將工作執行緒數量減1,如下所示。

//執行過程中出現了異常,突然中斷
if (completedAbruptly)
	//將工作執行緒的數量減1
	decrementWorkerCount();

(2)獲取鎖累加完成的任務數量,並將完成的任務從workers集合中移除,並釋放,如下所示。

//獲取全域性鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
	//累加完成的任務數量
	completedTaskCount += w.completedTasks;
	//將完成的任務從workers集合中移除
	workers.remove(w);
} finally {
	//釋放鎖
	mainLock.unlock();
}

(3)嘗試終止工作執行緒的執行

//嘗試終止工作執行緒的執行
tryTerminate();

(4)處判斷當前執行緒池中的執行緒個數是否小於核心執行緒數,如果是,需要新增一個執行緒保證有足夠的執行緒可以執行任務佇列中的任務或者提交的任務。

//獲取ctl
int c = ctl.get();
//判斷當前執行緒池的狀態是否小於STOP(RUNNING或者SHUTDOWN)
if (runStateLessThan(c, STOP)) {
	//如果沒有突然中斷完成
	if (!completedAbruptly) {
		//如果allowCoreThreadTimeOut為true,為min賦值為0,否則賦值為corePoolSize
		int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
		//如果min為0並且工作佇列不為空
		if (min == 0 && ! workQueue.isEmpty())
			//min的值設定為1
			min = 1;
		//如果執行緒池中的執行緒數量大於min的值
		if (workerCountOf(c) >= min)
			//返回,不再執行程式
			return; 
	}
	//呼叫addWorker方法
	addWorker(null, false);
}

接下來,我們看下tryTerminate()方法。

tryTerminate()方法

tryTerminate()方法的原始碼如下所示。

final void tryTerminate() {
	//自旋for迴圈
	for (;;) {
		//獲取ctl
		int c = ctl.get();
		//如果執行緒池的狀態為RUNNING
		//或者狀態大於TIDYING
		//或者狀態為SHUTDOWN並且任務佇列為空
		//直接返回程式,不再執行後續邏輯
		if (isRunning(c) ||
			runStateAtLeast(c, TIDYING) ||
			(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
			return;
		//如果當前執行緒池中的執行緒數量不等於0
		if (workerCountOf(c) != 0) { 
			//中斷執行緒的執行
			interruptIdleWorkers(ONLY_ONE);
			return;
		}
		//獲取執行緒池的全域性鎖
		final ReentrantLock mainLock = this.mainLock;
		mainLock.lock();
		try {
			//通過CAS將執行緒池的狀態設定為TIDYING
			if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
				try {
					//呼叫terminated()方法
					terminated();
				} finally {
					//將執行緒池狀態設定為TERMINATED
					ctl.set(ctlOf(TERMINATED, 0));
					//喚醒所有因為呼叫執行緒池的awaitTermination方法而被阻塞的執行緒
					termination.signalAll();
				}
				return;
			}
		} finally {
			//釋放鎖
			mainLock.unlock();
		}
	}
}

(1)獲取ctl,根據情況設定執行緒池狀態或者中斷執行緒的執行,並返回。

//獲取ctl
int c = ctl.get();
//如果執行緒池的狀態為RUNNING
//或者狀態大於TIDYING
//或者狀態為SHUTDOWN並且任務佇列為空
//直接返回程式,不再執行後續邏輯
if (isRunning(c) ||
	runStateAtLeast(c, TIDYING) ||
	(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
	return;
//如果當前執行緒池中的執行緒數量不等於0
if (workerCountOf(c) != 0) { 
	//中斷執行緒的執行
	interruptIdleWorkers(ONLY_ONE);
	return;
}

(2)獲取全域性鎖,通過CAS設定執行緒池的狀態,呼叫terminated()方法執行邏輯,最終將執行緒池的狀態設定為TERMINATED,喚醒所有因為呼叫執行緒池的awaitTermination方法而被阻塞的執行緒,最終釋放鎖,如下所示。

//獲取執行緒池的全域性
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
	//通過CAS將執行緒池的狀態設定為TIDYING
	if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
		try {
			//呼叫terminated()方法
			terminated();
		} finally {
			//將執行緒池狀態設定為TERMINATED
			ctl.set(ctlOf(TERMINATED, 0));
			//喚醒所有因為呼叫執行緒池的awaitTermination方法而被阻塞的執行緒
			termination.signalAll();
		}
		return;
	}
} finally {
	//釋放鎖
	mainLock.unlock();
}

接下來,看下terminated()方法。

terminated()方法

terminated()方法的原始碼如下所示。

protected void terminated() { }

可以看到,terminated()方法的方法體為空,我們可以建立ThreadPoolExecutor的子類來重寫terminated()方法,值得Worker執行緒呼叫tryTerminate()方法時執行我們自己定義的terminated()方法的業務邏輯。

好了,今天就到這兒吧,我是冰河,我們下期見~~