在日常開發中存在著排程延時任務、定時任務的需求,而jdk中提供了兩種基於記憶體的任務排程工具,即相對早期的java.util.Timer類和java.util.concurrent中的ScheduledThreadPoolExecutor。
Timer類其底層基於完全二元堆積實現的優先順序佇列,使得當前最早應該被執行的任務始終保持在佇列頭,並能以O(log n)對數的時間複雜度完成任務的入隊和出隊。
Timer是比較早被引入jdk的,其只支援單執行緒處理任務,因此如果先被處理的任務比較耗時便會阻塞後續任務的執行,進而導致任務排程不夠及時(比如本來10分鐘後要執行的任務,可能被前一個耗時的任務拖延到15分鐘後才執行)
Timer排程器的改進版本ScheduledThreadPoolExecutor在jdk1.5中隨著juc包一起被引入。
ScheduledThreadPoolExecutor是建立在二元堆積優先順序佇列和juc的ThreadPoolExecutor基礎之上的,如果對兩者工作原理不甚瞭解的話,會嚴重影響對ScheduledThreadPoolExecutor的理解。
在展開分析ScheduledThreadPoolExecutor的原始碼之前,先思考幾個問題。帶著問題去閱讀原始碼會更有效率。
為了加深理解和新增註釋,我基於jdk的ScheduledThreadPoolExecutor自己重新實現了一遍。所有的類名都在jdk類的基礎上加上My字尾,便於區分。
綜上所述,ScheduledThreadPoolExecutor作為ThreadPoolExecutor的子類,大量複用了ThreadPoolExecutor中的邏輯,主要提供了一個客製化化的工作佇列外就很巧妙地實現了多執行緒並行的任務排程功能。
/**
* MyScheduledThreadPoolExecutor
* */
public class MyScheduledThreadPoolExecutor extends MyThreadPoolExecutorV2 implements MyScheduledExecutorService {
/**
* 單調自增發號器,為每一個新建立的ScheduledFutureTask設定一個唯一的序列號
* */
private static final AtomicLong sequencer = new AtomicLong();
/**
* 取消任務時,是否需要將其從延遲佇列中移除掉
* True if ScheduledFutureTask.cancel should remove from queue
* */
private volatile boolean removeOnCancel = false;
/**
* False if should cancel/suppress periodic tasks on shutdown.
*/
private volatile boolean continueExistingPeriodicTasksAfterShutdown = false;
/**
* False if should cancel non-periodic tasks on shutdown.
*/
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
/**
* 比父類別ThreadPoolExecutor相對受限的建構函式
* */
public MyScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, MyRejectedExecutionHandler handler) {
// 1. 只能使用內部的DelayedWorkQueue作為工作佇列。
// DelayedWorkQueue是無界佇列,只需要指定corePoolSize即可,maximumPoolSize沒用(核心執行緒不夠用就全部在佇列裡積壓著等慢慢消費)
// 2. corePoolSize決定了ScheduledThreadPoolExecutor處理任務的及時性。核心執行緒越多處理任務就越及時,越不容易被非常耗時的任務影響排程的實時性,但也越消耗系統資源。
// 3. keepAliveTime=0,一般來說核心執行緒是不應該退出的,除非父類別裡allowCoreThreadTimeOut被設定為true了
// 那樣沒有任務時核心執行緒就會立即被回收了(keepAliveTime=0, allowCoreThreadTimeOut=true)
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new MyDelayedWorkQueue(), threadFactory, handler);
}
}
/**
* 排程任務物件
* */
private class MyScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V>{
/**
* 用於保證相同延遲時間的任務,其FIFO(先提交的先執行)的特性
* */
private final long sequenceNumber;
/**
* 當前任務下一次執行的時間(絕對時間,單位納秒nanos)
* */
private long time;
/**
* 需要重複執行的任務使用的屬性
* 1 period>0,說明該任務是一個固定週期重複執行的任務(通過scheduleAtFixedRate方法提交)
* 2 period<0,說明該任務是一個固定延遲重複執行的任務(通過scheduleWithFixedDelay方法提交)
* 3 period=0,說明該任務是一個一次性執行的任務(通過schedule方法提交)
* */
private final long period;
/**
* 定期任務實際執行的具體任務
* */
RunnableScheduledFuture<V> outerTask = this;
/**
* 基於二元堆積的延遲佇列中的陣列下標,用於快速的查詢、定位
* */
int heapIndex;
/**
* 一次性任務的建構函式(one action)
* */
MyScheduledFutureTask(Runnable runnable, V result, long ns) {
super(runnable, result);
// 下一次執行的時間
this.time = ns;
// 非週期性任務,period設定為0
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
/**
* 週期性任務的建構函式
*/
MyScheduledFutureTask(Runnable runnable, V result, long ns, long period) {
super(runnable, result);
// 下一次執行的時間
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
@Override
public boolean isPeriodic() {
// period為0代表是一次性任務
// period部位0代表是週期性任務
return period != 0;
}
/**
* 獲得下一次執行的時間
* */
@Override
public long getDelay(TimeUnit unit) {
// 獲得time屬性與當前時間之差
long delay = time - System.nanoTime();
// 基於引數unit轉換
return unit.convert(delay, NANOSECONDS);
}
/**
* 用於延遲佇列中的優先順序佇列的大小比較
* 基於time比較
* 1. time越小,值越大(越早應該被排程執行的任務,越靠前)
* 2. time相等就進一步比較sequenceNumber(排程時間一致的)
* */
@Override
public int compareTo(Delayed other) {
if (other == this) {
// 同一個物件是相等的,返回0
return 0;
}
if (other instanceof MyScheduledFutureTask) {
// 同樣是ScheduledFutureTask
MyScheduledFutureTask<?> x = (MyScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0) {
// 當前物件延遲時間更小,返回-1
return -1;
} else if (diff > 0) {
// 當前物件延遲時間更大,返回1
return 1;
} else if (sequenceNumber < x.sequenceNumber) {
// 延遲時間相等,比較序列號
// 當前物件序列號更小,需要排更前面返回-1
return -1;
} else {
// 當前物件序列號更大,返回1
return 1;
}
}else{
// 不是ScheduledFutureTask,通過getDelay比較延遲時間
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
// return (diff < 0) ? -1 : (diff > 0) ? 1 : 0
if(diff < 0){
// 當前物件延遲時間小,返回-1
return -1;
}else if(diff > 0){
// 當前物件延遲時間大,返回1
return 1;
}else{
// 延遲時間相等返回0
return 0;
}
}
}
}
DelayedWorkQueue的實現機制基本上等價於juc包下的PriorityQueue加DelayQueue。如果可以的話建議讀者在理解了PriorityQueue、DelayQueue原理之後再來學習其工作機制,循序漸進而事半功倍。
很多實現的小細節都在MyDelayedWorkQueue中有詳細的註釋(比如二元堆積的插入、刪除,以及延遲佇列中getDelay值更小的任務入隊時應該怎麼處理等)。
/**
* 為排程任務執行緒池ScheduledThreadPoolExecutor專門客製化的工作佇列
* 1.基於完全二元堆積結構,令執行時間最小(最近)的任務始終位於堆頂(即佇列頭) ==> 小頂堆
* 2.實現上綜合了juc包下的DelayQueue和PriorityQueue的功能,並加上了一些基於ScheduledThreadPoolExecutor的一些邏輯
* 建議讀者在理解了PriorityQueue、DelayQueue原理之後再來學習其工作機制,循序漸進而事半功倍
* */
static class MyDelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
/**
* 完全二元堆積底層陣列的初始容量
* */
private static final int INITIAL_CAPACITY = 16;
/**
* 完全二元堆積底層陣列
* */
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
/**
* 互斥鎖,用於入隊等操作時的並行控制
* */
private final ReentrantLock lock = new ReentrantLock();
/**
* 佇列中任務元素的數量
* */
private int size = 0;
/**
* 等待執行佇列頭(最早應該執行的)任務的執行緒池工作執行緒
* 為什麼會引入一個這個呢?是為了減少其它執行緒take獲取新任務時不必要的等待
* 因為額外引入了一個作業系統層面的定時器,await帶超時時間比無限時間的await效能要差一些
* */
private Thread leader = null;
/**
* 當一個佇列頭部的任務可以被執行時,通知等待在available上的工作執行緒
* */
private final Condition available = lock.newCondition();
// ============================= 內部private的輔助方法 ================================
private void setIndex(RunnableScheduledFuture<?> f, int index) {
if (f instanceof MyScheduledFutureTask) {
// 如果任務物件是MyScheduledFutureTask型別,而不僅僅是RunnableScheduledFuture
// 則設定index屬性便於加速查詢
((MyScheduledFutureTask<?>) f).heapIndex = index;
}
}
/**
* 第k位的元素在二元堆積中上濾(小頂堆:最小的元素在堆頂)
* Call only when holding lock.
*
* 當新元素插入完全二元堆積時,我們直接將其插入向量末尾(堆底最右側),此時新元素的優先順序可能會大於其雙親元素甚至祖先元素,破壞了堆序性,
* 因此我們需要對插入的新元素進行一次上濾操作,使完全二元堆積恢復堆序性。
* 由於堆序性只和雙親和孩子節點相關,因此堆中新插入元素的非祖先元素的堆序性不會受到影響,上濾只是一個區域性性的行為。
* */
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
// 獲得第k個節點邏輯上的雙親節點
// 0
// 1 2
// 3 4 5 6
// (下標減1再除2,比如下標為5和6的元素邏輯上的parent就是下標為2的元素)
int parent = (k - 1) >>> 1;
// 拿到雙親節點對應的元素
RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0) {
// 如果當前需要上濾的元素key,其值大於或等於雙親節點就停止上濾過程(小頂堆)
break;
}
// 當前上濾的元素key,其值小於雙親節點
// 將雙親節點換下來,到第k位上(把自己之前的位置空出來)
queue[k] = e;
// 設定被換下來的雙親節點的index值
setIndex(e, k);
// 令k下標變為更小的parent,繼續嘗試下一輪上濾操作
k = parent;
}
// 上濾判斷結束後,最後空出來的parent的下標值對應的位置存放上濾的元素key
queue[k] = key;
// 設定key節點的index值
setIndex(key, k);
}
/**
* 第k位的元素在二元堆積中下濾(小頂堆:最小的元素在堆頂)
* Call only when holding lock.
*
* 當優先順序佇列中極值元素出隊時,需要在滿足堆序性的前提下,選出新的極值元素。
* */
private void siftDown(int k, RunnableScheduledFuture<?> key) {
// half為size的一半
int half = size >>> 1;
// k小於half才需要下濾,大於half說明第k位元素已經是葉子節點了,不需要繼續下濾了
while (k < half) {
// 獲得第k位元素邏輯上的左孩子節點的下標
int child = (k << 1) + 1;
// 獲得左孩子的元素
RunnableScheduledFuture<?> c = queue[child];
// 獲得第k位元素邏輯上的右孩子節點的下標
int right = child + 1;
// right沒有越界,則比較左右孩子值的大小
if (right < size && c.compareTo(queue[right]) > 0) {
// 左孩子大於右孩子,所以用右孩子和key比較,c=右孩子節點
// (if條件不滿足,則用左孩子和key比較,c=左孩子節點)
c = queue[child = right];
}
// key和c比較,如果key比左右孩子都小,則結束下濾
if (key.compareTo(c) <= 0) {
break;
}
// key大於左右孩子中更小的那個,則第k位換成更小的那個孩子(保證上層的節點永遠小於其左右孩子,保證堆序性)
queue[k] = c;
// 設定被換到上層去的孩子節點的index的值
setIndex(c, k);
// 令下標k變大為child,在迴圈中嘗試更下一層的下濾操作
k = child;
}
// 結束了下濾操作,最後將元素key放到最後被空出來的孩子節點原來的位置
queue[k] = key;
// 設定key的index值
setIndex(key, k);
}
/**
* 二元堆積擴容
* Call only when holding lock.
*/
private void grow() {
int oldCapacity = queue.length;
// 在原有基礎上擴容50%
int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
if (newCapacity < 0) {
// 處理擴容50%後整型溢位
newCapacity = Integer.MAX_VALUE;
}
// 將原來陣列中的陣列資料複製到新資料
// 令成員變數queue指向擴容後的新陣列
queue = Arrays.copyOf(queue, newCapacity);
}
/**
* 查詢x在二元堆積中的陣列下標
* @return 找到了就返回具體的下標,沒找到返回-1
* */
private int indexOf(Object x) {
if(x == null){
// 為空,直接返回-1
return -1;
}
if (x instanceof MyScheduledFutureTask) {
int i = ((MyScheduledFutureTask) x).heapIndex;
// 為什麼不直接以x.heapIndex為準?
// 因為可能物件x來自其它的執行緒池,而不是本執行緒池的
// (先判斷i是否合法,然後判斷第heapIndex個是否就是x)
if (i >= 0 && i < size && queue[i] == x) {
// 比對一下第heapIndex項是否就是x,如果是則直接返回
return i;
}else{
// heapIndex不合法或者queue[i] != x不相等,說明不是本執行緒池的任務物件,返回-1
return -1;
}
} else {
// 非ScheduledFutureTask,從頭遍歷到尾進行線性的檢查
for (int i = 0; i < size; i++) {
// 如果x和第i個相等,則返回i
if (x.equals(queue[i])) {
return i;
}
}
// 遍歷完了整個queue都沒找到,返回-1
return -1;
}
}
// ============================= 實現介面定義的方法 ======================================
@Override
public boolean contains(Object x) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 不為-1就是存在,返回true
// 反之就是不存在,返回false
return indexOf(x) != -1;
} finally {
lock.unlock();
}
}
@Override
public boolean remove(Object x) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = indexOf(x);
if (i < 0) {
// x不存在,直接返回
return false;
}
// x存在,先將其index設定為-1
setIndex(queue[i], -1);
// 二元堆積元素數量自減1
int s = --size;
// 將佇列最尾端的元素取出
RunnableScheduledFuture<?> replacement = queue[s];
queue[s] = null;
// s == i,說明被刪除的是最尾端的元素,移除後沒有破壞堆序性,直接返回即可
if (s != i) {
// 將佇列最尾端的元素放到被移除元素的位置,進行一次下濾
siftDown(i, replacement);
if (queue[i] == replacement) {
// 這裡為什麼還要上濾一次呢?其實是最尾端的元素replacement在放到第i個位置上執行下濾後,其雖然保證了小於其左右孩子節點,但依然可能大於其雙親節點
// 舉個例子:
// 0
// 10 1
// 20 30 2 3
// 40 50 60 70 4 5 6 7
// 如果刪除第3排第一個的20,則siftDown後會變成:
// 0
// 10 1
// 7 30 2 3
// 40 50 60 70 4 5 6
// replacement=7是小於其雙親節點10的,因此需要再進行一次上濾,使得最終結果為:
// 0
// 7 1
// 10 30 2 3
// 40 50 60 70 4 5 6
// 這種需要上濾的情況是相對特殊的,只有當下濾只有這個節點沒有動(即下濾後queue[i] == replacement)
// 因為這種情況下replacement不進行上濾的話**可能**小於其雙親節點,而違反了堆序性(heap invariant)
// 而如果下濾後移動了位置(queue[i] != replacement),則其必定大於其雙親節點,因此不需要嘗試上濾了
siftUp(i, replacement);
// 額外的:
// 最容易理解的實現刪除堆中元素的方法是將replacement置於堆頂(即第0個位置),進行一次時間複雜度為O(log n)的完整下濾而恢復堆序性
// 但與ScheduledThreadExecutor在第i個位置上進行下濾操作的演演算法相比其時間複雜度是更高的
// jdk的實現中即使下濾完成後再進行一次上濾,其**最差情況**也與從堆頂開始下濾的演演算法的效能一樣。雖然難理解一些但卻是更高效的堆元素刪除演演算法
// 在remove方法等移除佇列中間元素時,會比從堆頂直接下濾效率高
}
}
return true;
} finally {
lock.unlock();
}
}
/**
* 入隊操作
* */
@Override
public boolean offer(Runnable x) {
if (x == null) {
throw new NullPointerException();
}
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length) {
// 容量不足,擴容
grow();
}
size = i + 1;
if (i == 0) {
// 佇列此前為空,第0位設定就ok了
queue[0] = e;
setIndex(e, 0);
} else {
// 佇列此前不為空,加入隊尾進行一次上濾,恢復堆序性
siftUp(i, e);
}
// 插入堆後,發現自己是佇列頭(最早要執行的任務)
if (queue[0] == e) {
// 已經有新的佇列頭任務了,leader設定為空
leader = null;
// 通知take時阻塞等待獲取新任務的工作執行緒
available.signal();
}
} finally {
lock.unlock();
}
// 無界佇列,入隊一定成功
return true;
}
/**
* 佇列元素f出隊操作(修改size等資料,並且恢復f移除後的堆序性)
* */
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
// size自減1
int s = --size;
RunnableScheduledFuture<?> x = queue[s];
queue[s] = null;
if (s != 0) {
// 由於佇列頭元素出隊了,把隊尾元素放到隊頭進行一次下濾,以恢復堆序性
siftDown(0, x);
}
// 被移除的元素f,index設定為-1
setIndex(f, -1);
// 返回被移除佇列的元素
return f;
}
/**
* 出隊操作(非阻塞)
* */
@Override
public RunnableScheduledFuture<?> poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
RunnableScheduledFuture<?> first = queue[0];
if (first == null || first.getDelay(NANOSECONDS) > 0) {
// 如果佇列為空或者隊頭元素沒到需要執行的時間點(delay>0),返回null
return null;
} else {
// 返回佇列頭元素,並且恢復堆序性
return finishPoll(first);
}
} finally {
lock.unlock();
}
}
@Override
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// take是可響應中斷的
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null) {
// 佇列為空,await等待(可響應中斷)
available.await();
} else {
// 佇列不為空
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0) {
// 佇列頭的元素delay<=0,到可執行的時間點了,返回即可
return finishPoll(first);
}
// first設定為null,便於await期間提早gc這個臨時變數
first = null; // don't retain ref while waiting
if (leader != null){
// leader不為空,說明已經有別的執行緒在take等待了,await無限等待
// (不帶超時時間的await效能更好一些,佇列頭元素只需要由leader執行緒來獲取就行,其它的執行緒就等leader處理完隊頭任務後將其喚醒)
available.await();
} else {
// leader為空,說明之前沒有別的執行緒在take等待
Thread thisThread = Thread.currentThread();
// 令當前執行緒為leader
leader = thisThread;
try {
// leader await帶超時時間(等待佇列頭的任務delay時間,確保任務可執行時第一時間被喚醒去執行)
available.awaitNanos(delay);
} finally {
if (leader == thisThread) {
// take方法退出,當前執行緒不再是leader了
leader = null;
}
}
}
}
}
} finally {
if (leader == null && queue[0] != null) {
// leader為空,且佇列不為空(比如leader執行緒被喚醒後,通過finishPoll已經獲得了之前的佇列頭元素)
// 嘗試喚醒之前阻塞等待的那些消費者執行緒
available.signal();
}
lock.unlock();
}
}
@Override
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
// pool是可響應中斷的
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null) {
// 佇列為空
if (nanos <= 0) {
// timeout等待時間超時了,返回null(一般不是第一次迴圈)
return null;
} else {
// 佇列元素為空,等待timeout
nanos = available.awaitNanos(nanos);
}
} else {
// 佇列不為空
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0) {
// delay<=0,隊頭元素滿足出隊條件
return finishPoll(first);
}
if (nanos <= 0) {
// 佇列不為空,但是timeout等待時間超時了,返回null(一般不是第一次迴圈)
return null;
}
// first設定為null,便於await期間提早gc這個臨時變數
first = null; // don't retain ref while waiting
if (nanos < delay || leader != null) {
// poll指定的等待時間小於隊頭元素delay的時間,或者leader不為空(之前已經有別的執行緒在等待了撈取任務了)
// 最多等待到timeout
nanos = available.awaitNanos(nanos);
} else {
// 隊頭元素delay的時間早於waitTime指定的時間,且此前leader為null
// 當前執行緒成為leader
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待delay時間
long timeLeft = available.awaitNanos(delay);
// 醒來後,nanos自減
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread) {
leader = null;
}
}
}
}
}
} finally {
if (leader == null && queue[0] != null) {
// leader為空,且佇列不為空(比如leader執行緒被喚醒後,通過finishPoll已經獲得了之前的佇列頭元素)
// 嘗試喚醒之前阻塞等待的那些消費者執行緒
available.signal();
}
lock.unlock();
}
}
@Override
public void clear() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
for (int i = 0; i < size; i++) {
RunnableScheduledFuture<?> t = queue[i];
if (t != null) {
// 將佇列內部陣列的值全部設定為null
queue[i] = null;
// 所有任務物件的index都設定為-1
setIndex(t, -1);
}
}
size = 0;
} finally {
lock.unlock();
}
}
/**
* Returns first element only if it is expired.
* Used only by drainTo. Call only when holding lock.
*/
private RunnableScheduledFuture<?> peekExpired() {
RunnableScheduledFuture<?> first = queue[0];
// 如果隊頭元素存在,且已到期(expired) 即delay <= 0,返回隊頭元素,否則返回null
return (first == null || first.getDelay(NANOSECONDS) > 0) ? null : first;
}
@Override
public int drainTo(Collection<? super Runnable> c) {
if (c == null) {
throw new NullPointerException();
}
if (c == this) {
throw new IllegalArgumentException();
}
final ReentrantLock lock = this.lock;
lock.lock();
try {
RunnableScheduledFuture<?> first;
int n = 0;
// 延遲佇列的drainTo只返回已過期的所有元素
while ((first = peekExpired()) != null) {
// 已過期的元素加入引數指定的集合
c.add(first); // In this order, in case add() throws.
// 同時將其從佇列中移除
finishPoll(first);
// 總共遷移元素的個數自增
++n;
}
// 佇列為空,或者佇列頭元素未過期則跳出迴圈
// 返回總共遷移元素的個數
return n;
} finally {
lock.unlock();
}
}
@Override
public Object[] toArray() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 佇列底層本來就是陣列,直接copy一份即可
return Arrays.copyOf(queue, size, Object[].class);
} finally {
lock.unlock();
}
}
}
ScheduledThreadPoolExecutor允許使用者提供三種不同型別的任務:
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
if (command == null || unit == null) {
throw new NullPointerException();
}
// 裝飾任務物件
RunnableScheduledFuture<?> t = decorateTask(command,
new MyScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
// 提交任務到工作佇列中,以令工作執行緒滿足條件時將其取出來排程執行
delayedExecute(t);
return t;
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
if (command == null || unit == null) {
throw new NullPointerException();
}
if (period <= 0) {
throw new IllegalArgumentException();
}
// 固定週期重複執行的任務,period引數為正數
MyScheduledFutureTask<Void> scheduledFutureTask = new MyScheduledFutureTask<>(
command, null, triggerTime(initialDelay, unit), unit.toNanos(period));
// 裝飾任務物件
RunnableScheduledFuture<Void> t = decorateTask(command, scheduledFutureTask);
// 記錄使用者實際提交的任務物件
scheduledFutureTask.outerTask = t;
// 提交任務到工作佇列中,以令工作執行緒滿足條件時將其取出來排程執行
delayedExecute(t);
return t;
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
// 固定延遲重複執行的任務,period引數為負數
MyScheduledFutureTask<Void> scheduledFutureTask =
new MyScheduledFutureTask<>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay));
// 裝飾任務物件
RunnableScheduledFuture<Void> t = decorateTask(command, scheduledFutureTask);
// 記錄使用者實際提交的任務物件
scheduledFutureTask.outerTask = t;
// 提交任務到工作佇列中,以令工作執行緒滿足條件時將其取出來排程執行
delayedExecute(t);
return t;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown()) {
// 執行緒池已經終止了,執行reject拒絕策略
super.reject(task);
} else {
// 沒有終止,任務在工作佇列中入隊
super.getQueue().add(task);
// 再次檢查狀態,如果執行緒池已經終止則回滾(將任務物件從工作佇列中remove掉,並且當前任務Future執行cancel方法取消掉)
// 在提交任務與執行緒池終止並行時,推進執行緒池儘早到達終結態
if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) {
task.cancel(false);
} else {
// 確保至少有一個工作執行緒會處理當前提交的任務
ensurePrestart();
}
}
}
第一種一次性的延遲任務的排程在前面的章節已經說的比較清楚了,就是簡單的將任務加入客製化的工作佇列,等待執行緒池中的工作執行緒在任務排程時間達到要求時令任務出隊並排程執行即可。
下面我們看看需要反覆被排程的週期性任務是如何被排程的,重點關注之前沒有展示的ScheduledFutureTask任務物件的run方法。
/**
* 排程任務物件
* */
private class MyScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V>{
/**
* 用於保證相同延遲時間的任務,其FIFO(先提交的先執行)的特性
* */
private final long sequenceNumber;
/**
* 當前任務下一次執行的時間(絕對時間,單位納秒nanos)
* */
private long time;
/**
* 需要重複執行的任務使用的屬性
* 1 period>0,說明該任務是一個固定週期重複執行的任務(通過scheduleAtFixedRate方法提交)
* 2 period<0,說明該任務是一個固定延遲重複執行的任務(通過scheduleWithFixedDelay方法提交)
* 3 period=0,說明該任務是一個一次性執行的任務(通過schedule方法提交)
* */
private final long period;
/**
* 定期任務實際執行的具體任務
* */
RunnableScheduledFuture<V> outerTask = this;
/**
* 基於二元堆積的延遲佇列中的陣列下標,用於快速的查詢、定位
* */
int heapIndex;
/**
* 一次性任務的建構函式(one action)
* */
MyScheduledFutureTask(Runnable runnable, V result, long ns) {
super(runnable, result);
// 下一次執行的時間
this.time = ns;
// 非週期性任務,period設定為0
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
/**
* 週期性任務的建構函式
*/
MyScheduledFutureTask(Runnable runnable, V result, long ns, long period) {
super(runnable, result);
// 下一次執行的時間
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
/**
* 設定下一次執行的事件
* */
private void setNextRunTime() {
long p = this.period;
if (p > 0) {
// fixedRate週期性任務,time基礎上單純的加period就能獲得下一次執行的時間
// (不用考慮溢位,因為如果因為time太大而溢位了(long型別溢位說明下一次執行時間是天荒地老),則永遠不會被執行也是合理的)
this.time += p;
} else {
// fixedDelay週期性任務,下一次時間為當前時間+period(當前排程已經執行完成,fixedDelay任務第n+1的執行時間是第n次執行完成後+period)
// 下一次排程的時間(需要處理溢位)
this.time = triggerTime(-p);
}
}
@Override
public void run() {
boolean periodic = isPeriodic();
// 根據當前執行緒池狀態,判斷當前任務是否應該取消(比如已經是STOP了,就應該停止繼續執行了)
if (!canRunInCurrentRunState(periodic)) {
// 不能正常執行,取消掉
cancel(false);
} else if (!periodic) {
// 非週期性任務,當做普通的任務直接run就行了
MyScheduledFutureTask.super.run();
} else if (MyScheduledFutureTask.super.runAndReset()) {
// 注意:runAndReset如果拋異常了,則不會走reExecutePeriodic邏輯重新加入工作佇列,導致這個週期性的任務就不會再被執行了
// If any execution of the task encounters an exception, subsequent executions are suppressed
// 設定下一次執行的事件
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
}
/**
* 嘗試重新提交併執行週期性任務(是屬於ScheduledThreadPoolExecutor的方法)
* */
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
// 當前執行緒池狀態允許執行任務,將任務加入到工作佇列中去
super.getQueue().add(task);
// 再次檢查,如果狀態發生了變化,不允許了,則通過remove方法將剛加入的任務移除掉,實現回滾
// 和ThreadPoolExecutor一致都是為了讓shutdown/stop狀態的執行緒池儘量在狀態變更和提交新任務出現並行時,不要去執行新任務儘早終止執行緒池
if (!canRunInCurrentRunState(true) && super.remove(task)) {
task.cancel(false);
} else {
// 確保至少有一個工作執行緒會處理當前提交的任務
super.ensurePrestart();
}
}
}
/**
* Returns current nanosecond time.
*/
final long now() {
return System.nanoTime();
}
/**
* 獲得下一次排程的絕對時間
* */
private long triggerTime(long delay, TimeUnit unit) {
// 統一轉成nanos級別計算
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
/**
* 獲得下一次排程的絕對時間
* @param delay 延遲時間(單位nanos)
*/
long triggerTime(long delay) {
if(delay < (Long.MAX_VALUE >> 1)){
// delay小於Long.MAX_VALUE/2,肯定不會發生compareTo時的溢位問題,直接正常累加delay即可
return now() + delay;
}else{
// delay大於Long.MAX_VALUE/2,可能會發生compareTo時的溢位問題,在overflowFree檢查並做必要的修正
return now() + overflowFree(delay);
}
}
固定頻率和固定延遲的週期性任務最大的區別就在於setNextRunTime方法中對於下一次排程時間計算的方式不同
ScheduledThreadPoolExecutor作為一個單機純記憶體的延時/定時任務排程框架能夠很好的應對日常開發中出現的多數需求,但其還是存在著一些缺陷。