SOFAJRaft的定時任務排程器是基於Netty來實現的,所以本文將會基於Netty時間輪演演算法,然後再結合SOFAJRaft原始碼進行分析。
@Author:Akai-yuan
@更新時間:2023/1/29
一個時間輪演演算法的組成成分圖:
一個基於Netty實現的時間輪(HashedWheelTimer)有三個核心部分:HashedWheelTimeout(任務封裝)、HashedWheelBucket(任務連結串列)、Worker(任務執行執行緒)
HashedWheelTimer欄位的具體作用全部以註釋的形式標記在以下程式碼塊中。
我們可以先看看HashedWheelTimer的屬性,看不懂沒有關係,可以先大致瞭解一下一個時間輪的屬性有些什麼。
//紀錄檔
private static final Logger LOG = LoggerFactoryLoggerFactory.getLogger(HashedWheelTimer.class);
//範例數量限制為256
private static final int INSTANCE_COUNT_LIMIT = 256;
//範例計數器
private static final AtomicInteger instanceCounter = new AtomicInteger();
//超過範例最大數量後是否警告標識,配合INSTANCE_COUNT_LIMIT欄位使用
private static final AtomicBoolean warnedTooManyInstances = new AtomicBoolean();
//原子更新欄位類,用於原子更新workerState屬性
private static final AtomicIntegerFieldUpdater<HashedWheelTimer> workerStateUpdater =
AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class,"workerState");
//繼承自Runnable,Worker是整個時間輪的執行流程管理者
private final Worker worker = new Worker();
//工作執行緒
private final Thread workerThread;
//工作狀態碼常數類【0 - init, 1 - started, 2 - shut down】
public static final int WORKER_STATE_INIT = 0;
public static final int WORKER_STATE_STARTED = 1;
public static final int WORKER_STATE_SHUTDOWN = 2;
//工作狀態碼
@SuppressWarnings({ "unused", "FieldMayBeFinal" })
private volatile int workerState;
// tick的時長,也就是指標多久轉一格
private final long tickDuration;
//時間輪陣列,每個位置是一個HashedWheelBucket
private final HashedWheelBucket[] wheel;
//定址識別符號用於快速定址
//公式:mask==wheel.length-1
//原理:當x=2^n(n為自然數)時 a%x=a&(x-1)
private final int mask;
//一個等待startTime初始化的計數器
private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
//用來暫時存放待加入時間輪的任務的佇列
private final Queue<HashedWheelTimeout> timeouts = new ConcurrentLinkedQueue<>();
//用來暫時存放已被取消的任務的佇列
private final Queue<HashedWheelTimeout> cancelledTimeouts = new ConcurrentLinkedQueue<>();
//未執行任務的計數器
private final AtomicLong pendingTimeouts = new AtomicLong(0);
//未執行任務的最大數量
private final long maxPendingTimeouts;
//開始時間
private volatile long startTime;
這個構造器裡面主要做一些初始化的工作。
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel,
long maxPendingTimeouts) {
//判空
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (tickDuration <= 0) {
throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
}
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
//將ticksPerWheel規格化為2的冪,並初始化輪子
wheel = createWheel(ticksPerWheel);
//定址識別符號
mask = wheel.length - 1;
//將tickDuration(時間單位為unit)轉換為納秒
this.tickDuration = unit.toNanos(tickDuration);
//防止溢位,指標轉動的時間間隔不能超過:Long.MAX_VALUE/wheel.length
if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
tickDuration, Long.MAX_VALUE/ wheel.length));
}
//將worker包裝成thread
workerThread = threadFactory.newThread(worker);
//預設-1
this.maxPendingTimeouts = maxPendingTimeouts;
//如果HashedWheelTimer範例太多,會列印error紀錄檔
if (instanceCounter.incrementAndGet() > INSTANCE_COUNT_LIMIT
&& warnedTooManyInstances.compareAndSet(false, true)) {
reportTooManyInstances();
}
}
實現原理圖:
HashedWheelTimer是整個時間輪演演算法的核心類,通過指定的Hash規則將不同TimeOut定時任務劃分到HashedWheelBucket進行管理,而HashedWheelBucket利用雙向連結串列結構維護了某一時刻需要執行的定時任務列表。
接上文SOFAJRaft原始碼閱讀-模組啟動過程,我們知道,在NodeImpl#init方法中,構造了多個RepeatedTimer範例:voteTimer、electionTimer、stepDownTimer、snapshotTimer。並且重寫了RepeatedTimer#onTrigger和RepeatedTimer#adjustTimeout兩個方法。緊接著,NodeImpl中的多個方法(如:init、electSelf、becomeLeader)會對這些RepeatedTimer範例呼叫RepeatedTimer#start方法啟動。
public void start() {
this.lock.lock();
try {
if (this.destroyed) {
return;
}
if (!this.stopped) {
return;
}
this.stopped = false;
if (this.running) {
return;
}
this.running = true;
schedule();
} finally {
this.lock.unlock();
}
}
RepeatedTimer#start中會呼叫RepeatedTimer#schedule:
private void schedule() {
if (this.timeout != null) {
this.timeout.cancel();
}
final TimerTask timerTask = timeout -> {
try {
//執行onTrigger,並設定狀態引數
RepeatedTimer.this.run();
} catch (final Throwable t) {
LOG.error("Run timer task failed, taskName={}.", RepeatedTimer.this.name, t);
}
};
this.timeout = this.timer.newTimeout(timerTask, adjustTimeout(this.timeoutMs), TimeUnit.MILLISECONDS);
}
HashedWheelTimer#cancel:
取消一個任務,並將其放入另一個cancelledTimeouts佇列
public boolean cancel() {
//只更新將在下一刻從HashedWheelBucket中刪除的狀態
if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
return false;
}
//如果一個任務應該被取消,我們將其放入另一個cancelledTimeouts佇列,該佇列將在每次tick時處理。
//因此,這意味著我們將有一個GC延遲,最大為1個tick的持續時間,這已經足夠好了。
//這樣,我們可以再次使用MpscLinkedQueue,從而儘可能減少鎖定/開銷。
timer.cancelledTimeouts.add(this);
return true;
}
重點理解HashedWheelTimer#newTimeout:
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
if (task == null) {
throw new NullPointerException("task");
}
if (unit == null) {
throw new NullPointerException("unit");
}
//等待的任務數 +1
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
// 如果時間輪內等待的任務數大於最大值,任務會被拋棄
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount
+ ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
// 開啟時間輪內的執行緒
start();
// 計算當前新增任務的執行時間
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
// 防止溢位
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
// 將任務加入佇列(注意,此時還未加入到時間輪中)
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}
我們知道,新新增的任務會先儲存在timeouts佇列中,當時間輪的時鐘撥動時才會判斷是否將佇列中的任務載入進時間輪。那麼工作執行緒開啟後,start() 方法會被阻塞,等工作執行緒(workerThread.start())的 startTime 屬性初始化完成後才被喚醒。
因為上面的 newTimeout 方法線上程開啟後【start()】,需要計算當前新增進來任務的執行時間【long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;】,而這個執行時間是根據 startTime 計算的。
HashedWheelTimer#start:
public void start() {
// Lock Free設計。可能有多個執行緒呼叫啟動方法,這裡使用AtomicIntegerFieldUpdater原子的更新時間輪的狀態,
// 它是JUC裡面的類,利用反射進行原子操作。有比AtomicInteger更好的效能和更低得記憶體佔用
switch (workerStateUpdater.get(this)) {
case WORKER_STATE_INIT:
if (workerStateUpdater.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
// startTimeInitialized 是一個 CountDownLatch,目的是為了保證工作執行緒的 startTime 屬性初始化
// startTime的初始化和startTimeInitialized.countDown()方法會在Worker#run
// [也就是workerThread.start()中]完成
while (startTime == 0) {
try {
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}
時間輪每撥動一次就會觸發tick++,然後tick與mask(時間輪陣列長度 - 1)進行 & 運算,可以快速定位時間輪陣列內的槽【mask定址識別符號用於快速定址,其原理:當x=2^n(n為自然數)時 a%x=a&(x-1)】。因為 tick 值一直在增加,所以時間輪陣列看起來就像一個不斷迴圈的圓。
public void run() {
// 初始化 startTime
startTime = System.nanoTime();
if (startTime == 0) {
startTime = 1;
}
// 用來喚醒被阻塞的 HashedWheelTimer#start() 方法,保證 startTime 初始化
startTimeInitialized.countDown();
do {
// 時鐘撥動,有返回值(必須是正數)的時候說明可以撥動時鐘了
final long deadline = waitForNextTick();
if (deadline > 0) {
int idx = (int) (tick & mask);
// 處理過期的任務
processCancelledTasks();
HashedWheelBucket bucket = wheel[idx];
// 將任務載入進時間輪
transferTimeoutsToBuckets();
// 執行當前時間輪槽內的任務
bucket.expireTimeouts(deadline);
tick++;
}
} while (workerStateUpdater.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
// 時間輪關閉,將還未執行的任務以列表的形式儲存到 unprocessedTimeouts 集合中,在 stop 方法中返回出去
// 還未執行的任務可能會在兩個地方,一:時間輪陣列內,二:佇列中
for (HashedWheelBucket bucket : wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
for (;;) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
processCancelledTasks();
}
Worker#waitForNextTick:
private long waitForNextTick() {
// 計算時鐘下次撥動的相對時間
long deadline = tickDuration * (tick + 1);
for (;;) {
// 獲取當前時間的相對時間
final long currentTime = System.nanoTime() - startTime;
// 計算距離時鐘下次撥動的時間
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
// <=0 說明可以撥動時鐘了
if (sleepTimeMs <= 0) {
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
return currentTime;
}
}
// sleep 到下次時鐘撥動
try {
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
if (workerStateUpdater.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}
Worker#processCancelledTasks:
private void processCancelledTasks() {
//遍歷cancelledTimeouts中所有範例並從其對應HashedWheelBucket中移除
for (;;) {
HashedWheelTimeout timeout = cancelledTimeouts.poll();
if (timeout == null) {
// all processed
break;
}
try {
timeout.remove();
} catch (Throwable t) {
if (LOG.isWarnEnabled()) {
LOG.warn("An exception was thrown while process a cancellation task", t);
}
}
}
}
Worker#transferTimeoutsToBuckets:
在上面也提到過,任務剛加進來不會立即到時間輪中去,而是暫時儲存到一個佇列中,當時間輪時鐘撥動時,會將任務從佇列中載入進時間輪內。
private void transferTimeoutsToBuckets() {
// 一次最多隻處理佇列中的 100000 個任務
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
// all processed
break;
}
// 過濾已經取消的任務
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
// Was cancelled in the meantime.
continue;
}
// 計算當前任務到執行還需要經過幾次時鐘撥動
// 假設時間輪陣列大小是 10,calculated 為 12,需要時間輪轉動一圈加兩次時鐘撥動後後才能執行這個任務,因此還需要計算一下圈數
long calculated = timeout.deadline / tickDuration;
// 計算當前任務到執行還需要經過幾圈時鐘撥動
timeout.remainingRounds = (calculated - tick) / wheel.length;
// 有的任務可能在佇列裡很長時間,時間過期了也沒有被排程,將這種情況的任務放在當前輪次內執行
final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
// 計算任務在時間輪陣列中的槽
int stopIndex = (int) (ticks & mask);
HashedWheelBucket bucket = wheel[stopIndex];
// 將任務放到時間輪的陣列中,多個任務可能定位時間輪的同一個槽,這些任務通過以連結串列的形式連結
bucket.addTimeout(timeout);
}
}
HashedWheelBucket#expireTimeouts:
時間輪槽內的任務以連結串列形式儲存,這些任務執行的時間可能會不一樣,有的在當前時鐘執行,有的在下一圈或者之後幾圈對應的時鐘才會執行。當任務在當前時鐘執行時,需要將這個任務從連結串列中刪除,重新維護連結串列關係。
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// process all timeouts
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
// 任務執行的圈數 > 0,表示任務還需要經過 remainingRounds 圈時鐘迴圈才能執行
if (timeout.remainingRounds <= 0) {
// 從連結串列中移除當前任務,並返回連結串列中下一個任務
next = remove(timeout);
if (timeout.deadline <= deadline) {
// 執行任務
timeout.expire();
} else {
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)",
timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) {
// 過濾取消的任務
next = remove(timeout);
} else {
// 圈數 -1
timeout.remainingRounds--;
}
timeout = next;
}
}
HashedWheelTimeout#expire:
public void expire() {
// CAS任務狀態變換
if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
return;
}
try {
task.run(this);
} catch (Throwable t) {
if (LOG.isWarnEnabled()) {
LOG.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
}
}
}
HashedWheelTimer#stop():
觸發終止時間輪的鏈路為:NodeImpl#destroyAllTimers()->RepeatedTimer#destroy()->timer.stop()
public Set<Timeout> stop() {
// 終止時間輪的執行緒不能是時間輪的工作執行緒
if (Thread.currentThread() == workerThread) {
throw new IllegalStateException(HashedWheelTimer.class.getSimpleName() + ".stop() cannot be called from "
+ TimerTask.class.getSimpleName());
}
// 將時間輪的狀態修改為 WORKER_STATE_SHUTDOWN,這裡有兩種情況
// 一:時間輪是 WORKER_STATE_INIT 狀態,表明時間輪從建立到終止一直沒有任務進來
// 二:時間輪是 WORKER_STATE_STARTED 狀態,多個執行緒嘗試終止時間輪,只有一個操作成功
if (!workerStateUpdater.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
// 程式碼走到這裡,時間輪只能是兩種狀態中的一個,WORKER_STATE_INIT 和 WORKER_STATE_SHUTDOWN
// 為 WORKER_STATE_INIT 表示時間輪沒有任務,因此不用返回未處理的任務,但是需要將時間輪範例 -1
// 為 WORKER_STATE_SHUTDOWN 表示是 CAS 操作失敗,什麼都不用做,因為 CAS 成功的執行緒會處理
if (workerStateUpdater.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
// 時間輪範例物件 -1
instanceCounter.decrementAndGet();
}
// CAS 操作失敗,或者時間輪沒有處理過任務,返回空的任務列表
return Collections.emptySet();
}
try {
boolean interrupted = false;
while (workerThread.isAlive()) {
// 中斷時間輪工作執行緒
workerThread.interrupt();
try {
// 終止時間輪的執行緒等待時間輪工作執行緒 100ms,這個過程主要是為了時間輪工作執行緒處理未執行的任務
workerThread.join(100);
} catch (InterruptedException ignored) {
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
} finally {
instanceCounter.decrementAndGet();
}
// 返回未處理的任務
return worker.unprocessedTimeouts();
}
SOFAJRaft採用的是Netty的時間輪演演算法來實現任務排程器,但是Netty的時間輪演演算法存在一定缺陷,比如:它是通過單執行緒實現的,如果在執行任務的過程中出現阻塞,會影響後面任務執行。Netty 中的時間輪並不適合建立延遲時間跨度很大的任務,比如往時間輪內丟成百上千個任務並設定 10 天后執行,這樣可能會導致連結串列過長 round 值很大,而且這些任務在執行之前會一直佔用記憶體。
在閱讀這部分程式碼的時候,發生了一個有趣的事情。作者發現在時間輪演演算法中有一部分程式碼是可以被優化的:
在HashedWheelTimer#normalizeTicksPerWheel方法中,當ticksPerWheel的值較大時,這個方法會迴圈很多次,方法執行時間會不穩定,導致效率可能會偏低。感覺可以使用java8 HashMap的相關實現來完善改演演算法,具體實現如下:
int n = ticksPerWheel - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
// 此處1073741824 = 2^30,防止溢位
return (n < 0) ? 1 : (n >= 1073741824) ? 1073741824 : n + 1;
想到這些,於是我就去給SOFAJRaft社群提了一個issue,得到了幾位大佬的approve。於是乎我就提了PR。雖然看程式碼確實頭疼,但是整個過程還是挺快樂。
到這裡SOFAJRaft的定時任務排程器就差不多完整的走了一遍,第一遍看確實很容易懵逼,但是再讀幾遍還是會感覺很有成就感的。作者總結完這篇文章,差不多剛過完年,希望後面能繼續堅持下去。