【高並行】深度解析ScheduledThreadPoolExecutor類的原始碼

2022-10-24 12:01:58

在【高並行專題】的專欄中,我們深度分析了ThreadPoolExecutor類的原始碼,而ScheduledThreadPoolExecutor類是ThreadPoolExecutor類的子類。今天我們就來一起手撕ScheduledThreadPoolExecutor類的原始碼。

構造方法

我們先來看下ScheduledThreadPoolExecutor的構造方法,原始碼如下所示。

public ScheduledThreadPoolExecutor(int corePoolSize) {
	super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
}

public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
	super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
		  new DelayedWorkQueue(), threadFactory);
}

public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
	super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
		  new DelayedWorkQueue(), handler);
}

public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
	super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
		  new DelayedWorkQueue(), threadFactory, handler);
}

從程式碼結構上來看,ScheduledThreadPoolExecutor類是ThreadPoolExecutor類的子類,ScheduledThreadPoolExecutor類的構造方法實際上呼叫的是ThreadPoolExecutor類的構造方法。

schedule方法

接下來,我們看一下ScheduledThreadPoolExecutor類的schedule方法,原始碼如下所示。

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
	//如果傳遞的Runnable物件和TimeUnit時間單位為空
	//丟擲空指標異常
	if (command == null || unit == null)
		throw new NullPointerException();
	//封裝任務物件,在decorateTask方法中直接返回ScheduledFutureTask物件
	RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
	//執行延時任務
	delayedExecute(t);
	//返回任務
	return t;
}

public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) 
	//如果傳遞的Callable物件和TimeUnit時間單位為空
	//丟擲空指標異常
	if (callable == null || unit == null)
		throw new NullPointerException();
	//封裝任務物件,在decorateTask方法中直接返回ScheduledFutureTask物件
	RunnableScheduledFuture<V> t = decorateTask(callable,
		new ScheduledFutureTask<V>(callable, triggerTime(delay, unit)));
	//執行延時任務
	delayedExecute(t);
	//返回任務
	return t;
}

從原始碼可以看出,ScheduledThreadPoolExecutor類提供了兩個過載的schedule方法,兩個schedule方法的第一個引數不同。可以傳遞Runnable介面物件,也可以傳遞Callable介面物件。在方法內部,會將Runnable介面物件和Callable介面物件封裝成RunnableScheduledFuture物件,本質上就是封裝成ScheduledFutureTask物件。並通過delayedExecute方法來執行延時任務。

在原始碼中,我們看到兩個schedule都呼叫了decorateTask方法,接下來,我們就看看decorateTask方法。

decorateTask方法

decorateTask方法原始碼如下所示。

protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
	return task;
}

protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) {
	return task;
}

通過原始碼可以看出decorateTask方法的實現比較簡單,接收一個Runnable介面物件或者Callable介面物件和封裝的RunnableScheduledFuture任務,兩個方法都是將RunnableScheduledFuture任務直接返回。在ScheduledThreadPoolExecutor類的子類中可以重寫這兩個方法。

接下來,我們繼續看下scheduleAtFixedRate方法。

scheduleAtFixedRate方法

scheduleAtFixedRate方法原始碼如下所示。

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
	//傳入的Runnable物件和TimeUnit為空,則丟擲空指標異常
	if (command == null || unit == null)
		throw new NullPointerException();
	//如果執行週期period傳入的數值小於或者等於0
	//丟擲非法引數異常
	if (period <= 0)
		throw new IllegalArgumentException();
	//將Runnable物件封裝成ScheduledFutureTask任務,
	//並設定執行週期
	ScheduledFutureTask<Void> sft =
		new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period));
	//呼叫decorateTask方法,本質上還是直接返回ScheduledFutureTask物件
	RunnableScheduledFuture<Void> t = decorateTask(command, sft);
	//設定執行的任務
	sft.outerTask = t;
	//執行延時任務
	delayedExecute(t);
	//返回執行的任務
	return t;
}

通過原始碼可以看出,scheduleAtFixedRate方法將傳遞的Runnable物件封裝成ScheduledFutureTask任務物件,並設定了執行週期,下一次的執行時間相對於上一次的執行時間來說,加上了period時長,時長的具體單位由TimeUnit決定。採用固定的頻率來執行定時任務。

ScheduledThreadPoolExecutor類中另一個定時排程任務的方法是scheduleWithFixedDelay方法,接下來,我們就一起看看scheduleWithFixedDelay方法。

scheduleWithFixedDelay方法

scheduleWithFixedDelay方法的原始碼如下所示。

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
	//傳入的Runnable物件和TimeUnit為空,則丟擲空指標異常
	if (command == null || unit == null)
		throw new NullPointerException();
	//任務延時時長小於或者等於0,則丟擲非法引數異常
	if (delay <= 0)
		throw new IllegalArgumentException();
	//將Runnable物件封裝成ScheduledFutureTask任務
	//並設定固定的執行週期來執行任務
	ScheduledFutureTask<Void> sft =
		new ScheduledFutureTask<Void>(command, null,triggerTime(initialDelay, unit), unit.toNanos(-delay));
	//呼叫decorateTask方法,本質上直接返回ScheduledFutureTask任務
	RunnableScheduledFuture<Void> t = decorateTask(command, sft);
	//設定執行的任務
	sft.outerTask = t;
	//執行延時任務
	delayedExecute(t);
	//返回任務
	return t;
}

從scheduleWithFixedDelay方法的原始碼,我們可以看出在將Runnable物件封裝成ScheduledFutureTask時,設定了執行週期,但是此時設定的執行週期與scheduleAtFixedRate方法設定的執行週期不同。此時設定的執行週期規則為:下一次任務執行的時間是上一次任務完成的時間加上delay時長,時長單位由TimeUnit決定。也就是說,具體的執行時間不是固定的,但是執行的週期是固定的,整體採用的是相對固定的延遲來執行定時任務。

如果大家細心的話,會發現在scheduleWithFixedDelay方法中設定執行週期時,傳遞的delay值為負數,如下所示。

ScheduledFutureTask<Void> sft =
		new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay));

這裡的負數表示的是相對固定的延遲。

在ScheduledFutureTask類中,存在一個setNextRunTime方法,這個方法會在run方法執行完任務後呼叫,這個方法更能體現scheduleAtFixedRate方法和scheduleWithFixedDelay方法的不同,setNextRunTime方法的原始碼如下所示。

private void setNextRunTime() {
	//距離下次執行任務的時長
	long p = period;
	//固定頻率執行,
	//上次執行任務的時間
	//加上任務的執行週期
	if (p > 0)
		time += p;
	//相對固定的延遲
	//使用的是系統當前時間
	//加上任務的執行週期
	else
		time = triggerTime(-p);
}

在setNextRunTime方法中通過對下次執行任務的時長進行判斷來確定是固定頻率執行還是相對固定的延遲。

triggerTime方法

在ScheduledThreadPoolExecutor類中提供了兩個triggerTime方法,用於獲取下一次執行任務的具體時間。triggerTime方法的原始碼如下所示。

private long triggerTime(long delay, TimeUnit unit) {
	return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}

long triggerTime(long delay) {
	return now() +
		((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

這兩個triggerTime方法的程式碼比較簡單,就是獲取下一次執行任務的具體時間。有一點需要注意的是:delay < (Long.MAX_VALUE >> 1判斷delay的值是否小於Long.MAX_VALUE的一半,如果小於Long.MAX_VALUE值的一半,則直接返回delay,否則需要處理溢位的情況。

我們看到在triggerTime方法中處理防止溢位的邏輯使用了overflowFree方法,接下來,我們就看看overflowFree方法的實現。

overflowFree方法

overflowFree方法的原始碼如下所示。

private long overflowFree(long delay) {
	//獲取佇列中的節點
	Delayed head = (Delayed) super.getQueue().peek();
	//獲取的節點不為空,則進行後續處理
	if (head != null) {
		//從佇列節點中獲取延遲時間
		long headDelay = head.getDelay(NANOSECONDS);
		//如果從佇列中獲取的延遲時間小於0,並且傳遞的delay
		//值減去從佇列節點中獲取延遲時間小於0
		if (headDelay < 0 && (delay - headDelay < 0))
			//將delay的值設定為Long.MAX_VALUE + headDelay
			delay = Long.MAX_VALUE + headDelay;
	}
	//返回延遲時間
	return delay;
}

通過對overflowFree方法的原始碼分析,可以看出overflowFree方法本質上就是為了限制佇列中的所有節點的延遲時間在Long.MAX_VALUE值之內,防止在ScheduledFutureTask類中的compareTo方法中溢位。

ScheduledFutureTask類中的compareTo方法的原始碼如下所示。

public int compareTo(Delayed other) {
	if (other == this) // compare zero if same object
		return 0;
	if (other instanceof ScheduledFutureTask) {
		ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
		long diff = time - x.time;
		if (diff < 0)
			return -1;
		else if (diff > 0)
			return 1;
		else if (sequenceNumber < x.sequenceNumber)
			return -1;
		else
			return 1;
	}
	long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
	return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

compareTo方法的主要作用就是對各延遲任務進行排序,距離下次執行時間靠前的任務就排在前面。

delayedExecute方法

delayedExecute方法是ScheduledThreadPoolExecutor類中延遲執行任務的方法,原始碼如下所示。

private void delayedExecute(RunnableScheduledFuture<?> task) {
	//如果當前執行緒池已經關閉
	//則執行執行緒池的拒絕策略
	if (isShutdown())
		reject(task);
	//執行緒池沒有關閉
	else {
		//將任務新增到阻塞佇列中
		super.getQueue().add(task);
		//如果當前執行緒池是SHUTDOWN狀態
		//並且當前執行緒池狀態下不能執行任務
		//並且成功從阻塞佇列中移除任務
		if (isShutdown() &&
			!canRunInCurrentRunState(task.isPeriodic()) &&
			remove(task))
			//取消任務的執行,但不會中斷執行中的任務
			task.cancel(false);
		else
			//呼叫ThreadPoolExecutor類中的ensurePrestart()方法
			ensurePrestart();
	}
}

可以看到在delayedExecute方法內部呼叫了canRunInCurrentRunState方法,canRunInCurrentRunState方法的原始碼實現如下所示。

boolean canRunInCurrentRunState(boolean periodic) {
	return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown);
}

可以看到canRunInCurrentRunState方法的邏輯比較簡單,就是判斷執行緒池當前狀態下能夠執行任務。

另外,在delayedExecute方法內部還呼叫了ThreadPoolExecutor類中的ensurePrestart()方法,接下來,我們看下ThreadPoolExecutor類中的ensurePrestart()方法的實現,如下所示。

void ensurePrestart() {
	int wc = workerCountOf(ctl.get());
	if (wc < corePoolSize)
		addWorker(null, true);
	else if (wc == 0)
		addWorker(null, false);
}

在ThreadPoolExecutor類中的ensurePrestart()方法中,首先獲取當前執行緒池中執行緒的數量,如果執行緒數量小於corePoolSize則呼叫addWorker方法傳遞null和true,如果執行緒數量為0,則呼叫addWorker方法傳遞null和false。

關於addWork()方法的原始碼解析,大家可以參考【高並行專題】中的《高並行之——通過ThreadPoolExecutor類的原始碼深度解析執行緒池執行任務的核心流程》一文,這裡,不再贅述。

reExecutePeriodic方法

reExecutePeriodic方法的原始碼如下所示。

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
	//執行緒池當前狀態下能夠執行任務
	if (canRunInCurrentRunState(true)) {
		//將任務放入佇列
		super.getQueue().add(task);
		//執行緒池當前狀態下不能執行任務,並且成功移除任務
		if (!canRunInCurrentRunState(true) && remove(task))
			//取消任務
			task.cancel(false);
		else
			//呼叫ThreadPoolExecutor類的ensurePrestart()方法
			ensurePrestart();
	}
}

總體來說reExecutePeriodic方法的邏輯比較簡單,但是,這裡需要注意和delayedExecute方法的不同點:呼叫reExecutePeriodic方法的時候已經執行過一次任務,所以,並不會觸發執行緒池的拒絕策略;傳入reExecutePeriodic方法的任務一定是週期性的任務。

onShutdown方法

onShutdown方法是ThreadPoolExecutor類中的勾點函數,它是在ThreadPoolExecutor類中的shutdown方法中呼叫的,而在ThreadPoolExecutor類中的onShutdown方法是一個空方法,如下所示。

void onShutdown() {
}

ThreadPoolExecutor類中的onShutdown方法交由子類實現,所以ScheduledThreadPoolExecutor類覆寫了onShutdown方法,實現了具體的邏輯,ScheduledThreadPoolExecutor類中的onShutdown方法的原始碼實現如下所示。

@Override
void onShutdown() {
	//獲取佇列
	BlockingQueue<Runnable> q = super.getQueue();
	//線上程池已經呼叫shutdown方法後,是否繼續執行現有延遲任務
	boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
	//線上程池已經呼叫shutdown方法後,是否繼續執行現有定時任務
	boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
	//線上程池已經呼叫shutdown方法後,不繼續執行現有延遲任務和定時任務
	if (!keepDelayed && !keepPeriodic) {
		//遍歷佇列中的所有任務
		for (Object e : q.toArray())
			//取消任務的執行
			if (e instanceof RunnableScheduledFuture<?>)
				((RunnableScheduledFuture<?>) e).cancel(false);
		//清空佇列
		q.clear();
	}
	//線上程池已經呼叫shutdown方法後,繼續執行現有延遲任務和定時任務
	else {
		//遍歷佇列中的所有任務
		for (Object e : q.toArray()) {
			//當前任務是RunnableScheduledFuture型別
			if (e instanceof RunnableScheduledFuture) {
				//將任務強轉為RunnableScheduledFuture型別
				RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
				//線上程池呼叫shutdown方法後不繼續的延遲任務或週期任務
				//則從佇列中刪除並取消任務
				if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
					t.isCancelled()) {
					if (q.remove(t))
						t.cancel(false);
				}
			}
		}
	}
	//最終呼叫tryTerminate()方法
	tryTerminate();
}

ScheduledThreadPoolExecutor類中的onShutdown方法的主要邏輯就是先判斷執行緒池呼叫shutdown方法後,是否繼續執行現有的延遲任務和定時任務,如果不再執行,則取消任務並清空佇列;如果繼續執行,將佇列中的任務強轉為RunnableScheduledFuture物件之後,從佇列中刪除並取消任務。大家需要好好理解這兩種處理方式。最後呼叫ThreadPoolExecutor類的tryTerminate方法。有關ThreadPoolExecutor類的tryTerminate方法的原始碼解析,大家可以參考【高並行專題】中的《高並行之——通過原始碼深度分析執行緒池中Worker執行緒的執行流程》一文,這裡不再贅述。

至此,ScheduledThreadPoolExecutor類中的核心方法的原始碼,我們就分析完了。