SOFAJRaft原始碼閱讀(肆)-Netty時間輪演演算法的實踐

2023-01-29 18:00:52

SOFAJRaft的定時任務排程器是基於Netty來實現的,所以本文將會基於Netty時間輪演演算法,然後再結合SOFAJRaft原始碼進行分析。
@Author:Akai-yuan
@更新時間:2023/1/29

1.HashedWheelTimer概覽

一個時間輪演演算法的組成成分圖:
一個基於Netty實現的時間輪(HashedWheelTimer)有三個核心部分:HashedWheelTimeout(任務封裝)、HashedWheelBucket(任務連結串列)、Worker(任務執行執行緒)

屬性概覽

HashedWheelTimer欄位的具體作用全部以註釋的形式標記在以下程式碼塊中。
我們可以先看看HashedWheelTimer的屬性,看不懂沒有關係,可以先大致瞭解一下一個時間輪的屬性有些什麼。

	//紀錄檔
	private static final Logger LOG = LoggerFactoryLoggerFactory.getLogger(HashedWheelTimer.class);
    //範例數量限制為256
	private static final int INSTANCE_COUNT_LIMIT   = 256;
	//範例計數器
    private static final AtomicInteger instanceCounter = new AtomicInteger();
	//超過範例最大數量後是否警告標識,配合INSTANCE_COUNT_LIMIT欄位使用
    private static final AtomicBoolean warnedTooManyInstances = new AtomicBoolean();
	//原子更新欄位類,用於原子更新workerState屬性
    private static final AtomicIntegerFieldUpdater<HashedWheelTimer> workerStateUpdater = 
        AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class,"workerState");
	//繼承自Runnable,Worker是整個時間輪的執行流程管理者
    private final Worker worker = new Worker();
	//工作執行緒
    private final Thread workerThread;
	//工作狀態碼常數類【0 - init, 1 - started, 2 - shut down】
    public static final int WORKER_STATE_INIT      = 0;
    public static final int WORKER_STATE_STARTED   = 1;
    public static final int WORKER_STATE_SHUTDOWN  = 2;
	//工作狀態碼
    @SuppressWarnings({ "unused", "FieldMayBeFinal" })
    private volatile int workerState;
 	// tick的時長,也就是指標多久轉一格
    private final long tickDuration;
	//時間輪陣列,每個位置是一個HashedWheelBucket
    private final HashedWheelBucket[] wheel;
	//定址識別符號用於快速定址
	//公式:mask==wheel.length-1
	//原理:當x=2^n(n為自然數)時 a%x=a&(x-1)
    private final int mask;
    //一個等待startTime初始化的計數器
    private final CountDownLatch startTimeInitialized   = new CountDownLatch(1);
	//用來暫時存放待加入時間輪的任務的佇列
    private final Queue<HashedWheelTimeout> timeouts = new ConcurrentLinkedQueue<>();
	//用來暫時存放已被取消的任務的佇列
    private final Queue<HashedWheelTimeout> cancelledTimeouts = new ConcurrentLinkedQueue<>();
    //未執行任務的計數器
	private final AtomicLong pendingTimeouts = new AtomicLong(0);
    //未執行任務的最大數量
	private final long maxPendingTimeouts;
    //開始時間
	private volatile long startTime;

構造器概覽

這個構造器裡面主要做一些初始化的工作。

  1. 初始化一個陣列長度為2048的Wheel時間輪。由於傳入的陣列長度可能為Big Number,所以我去SOFAJRaft上提了一個issue,建議採用JAVA8-HashMap的相關實現來完善該演演算法,可見於:ISSUE-時間輪演演算法存在多回圈低效率問題
  2. 初始化mask,用來快速計算槽位的下標。
  3. 初始化tickDuration並轉化成納秒
  4. 校驗整個時間輪走完的時間不能過長
  5. 將worker包裝成thread
  6. 限制HashedWheelTimer範例數量
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel,
                            long maxPendingTimeouts) {
    	//判空
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }
        if (tickDuration <= 0) {
            throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
        }
        if (ticksPerWheel <= 0) {
            throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
        }
        //將ticksPerWheel規格化為2的冪,並初始化輪子
        wheel = createWheel(ticksPerWheel);
        //定址識別符號
        mask = wheel.length - 1;
        //將tickDuration(時間單位為unit)轉換為納秒
        this.tickDuration = unit.toNanos(tickDuration);
        //防止溢位,指標轉動的時間間隔不能超過:Long.MAX_VALUE/wheel.length
        if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
            throw new IllegalArgumentException(String.format(
 			"tickDuration: %d (expected: 0 < tickDuration in nanos < %d", 
                tickDuration, Long.MAX_VALUE/ wheel.length));
        }
    	//將worker包裝成thread
        workerThread = threadFactory.newThread(worker);
    	//預設-1
        this.maxPendingTimeouts = maxPendingTimeouts;
    	//如果HashedWheelTimer範例太多,會列印error紀錄檔
        if (instanceCounter.incrementAndGet() > INSTANCE_COUNT_LIMIT
            && warnedTooManyInstances.compareAndSet(false, true)) {
            reportTooManyInstances();
        }
    }

實現原理圖:

HashedWheelTimer是整個時間輪演演算法的核心類,通過指定的Hash規則將不同TimeOut定時任務劃分到HashedWheelBucket進行管理,而HashedWheelBucket利用雙向連結串列結構維護了某一時刻需要執行的定時任務列表。

接上文SOFAJRaft原始碼閱讀-模組啟動過程,我們知道,在NodeImpl#init方法中,構造了多個RepeatedTimer範例:voteTimer、electionTimer、stepDownTimer、snapshotTimer。並且重寫了RepeatedTimer#onTrigger和RepeatedTimer#adjustTimeout兩個方法。緊接著,NodeImpl中的多個方法(如:init、electSelf、becomeLeader)會對這些RepeatedTimer範例呼叫RepeatedTimer#start方法啟動。

2.啟動計時器

  • 加ReentrantLock鎖,保證只能一個執行緒呼叫這個方法
  • 啟動狀態引數校驗
  • 呼叫RepeatedTimer#schedule方法
  • 釋放鎖
    public void start() {
        this.lock.lock();
        try {
            if (this.destroyed) {
                return;
            }
            if (!this.stopped) {
                return;
            }
            this.stopped = false;
            if (this.running) {
                return;
            }
            this.running = true;
            schedule();
        } finally {
            this.lock.unlock();
        }
    }

3.任務排程

RepeatedTimer#start中會呼叫RepeatedTimer#schedule:

  • 如果RepeatedTimer中維護的HashedWheelTimeout(任務)不為空,則取消(HashedWheelTimer#cancel)該任務。
  • 宣告一個TimerTask,並通過HashedWheelTimer#newTimeout()構造一個HashedWheelTimeout
private void schedule() {
        if (this.timeout != null) {
            this.timeout.cancel();
        }
        final TimerTask timerTask = timeout -> {
            try {
                //執行onTrigger,並設定狀態引數
                RepeatedTimer.this.run();
            } catch (final Throwable t) {
                LOG.error("Run timer task failed, taskName={}.", RepeatedTimer.this.name, t);
            }
        };
        this.timeout = this.timer.newTimeout(timerTask, adjustTimeout(this.timeoutMs), TimeUnit.MILLISECONDS);
    }

HashedWheelTimer#cancel:
取消一個任務,並將其放入另一個cancelledTimeouts佇列

public boolean cancel() {
    		//只更新將在下一刻從HashedWheelBucket中刪除的狀態
           	if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
                return false;
            }
    		//如果一個任務應該被取消,我們將其放入另一個cancelledTimeouts佇列,該佇列將在每次tick時處理。
			//因此,這意味著我們將有一個GC延遲,最大為1個tick的持續時間,這已經足夠好了。
    		//這樣,我們可以再次使用MpscLinkedQueue,從而儘可能減少鎖定/開銷。
            timer.cancelledTimeouts.add(this);
            return true;
        }

4.往時間輪內新增任務

重點理解HashedWheelTimer#newTimeout:

  • 判空
  • 校驗pendingTimeoutsCount引數的合理性,如果maxPendingTimeouts(最大的等待加入的任務的數量)為0或負數,則表示不需要對pendingTimeoutsCount進行數量限制,否則會進行比較,超過限制則會丟擲異常。
  • 呼叫HashedWheelTimer#start()啟動時間輪。
  • 計算當前新增任務的執行時間。傳入的delay引數由RepeatedTimer#adjustTimeout(this.timeoutMs)獲取
  • 防溢位操作
  • 最後將任務加入佇列,此時還未加入到時間輪中,需要等待時鐘撥動(也就是當呼叫鏈路HashedWheelTimer#start->workerThread#start->Worker#run->waitForNextTick返回了引數時)才會觸發往時間輪內新增任務
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }
        //等待的任務數 +1
        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
        // 如果時間輪內等待的任務數大於最大值,任務會被拋棄
        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
            pendingTimeouts.decrementAndGet();
            throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount
                        + ") is greater than or equal to maximum allowed pending "
                        + "timeouts (" + maxPendingTimeouts + ")");
        }
        // 開啟時間輪內的執行緒
        start();
        // 計算當前新增任務的執行時間
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
        // 防止溢位
        if (delay > 0 && deadline < 0) {
            deadline = Long.MAX_VALUE;
        }
        // 將任務加入佇列(注意,此時還未加入到時間輪中)
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        timeouts.add(timeout);
        return timeout;
    }

5.開啟時間輪內的執行緒

我們知道,新新增的任務會先儲存在timeouts佇列中,當時間輪的時鐘撥動時才會判斷是否將佇列中的任務載入進時間輪。那麼工作執行緒開啟後,start() 方法會被阻塞,等工作執行緒(workerThread.start())的 startTime 屬性初始化完成後才被喚醒。
因為上面的 newTimeout 方法線上程開啟後【start()】,需要計算當前新增進來任務的執行時間【long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;】,而這個執行時間是根據 startTime 計算的。
HashedWheelTimer#start:

  • 判斷當前時間輪的狀態,如果是初始化,則啟動worker執行緒,啟動整個時間輪;如果已經啟動則略過;如果是已經停止,則報錯。
  • 如果初始化未完成,則需要等待worker執行緒完成startTime的初始化
    public void start() {
    // Lock Free設計。可能有多個執行緒呼叫啟動方法,這裡使用AtomicIntegerFieldUpdater原子的更新時間輪的狀態,
    // 它是JUC裡面的類,利用反射進行原子操作。有比AtomicInteger更好的效能和更低得記憶體佔用
        switch (workerStateUpdater.get(this)) {
            case WORKER_STATE_INIT:
                if (workerStateUpdater.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                    workerThread.start();
                }
                break;
            case WORKER_STATE_STARTED:
                break;
            case WORKER_STATE_SHUTDOWN:
                throw new IllegalStateException("cannot be started once stopped");
            default:
                throw new Error("Invalid WorkerState");
        }
    	// startTimeInitialized 是一個 CountDownLatch,目的是為了保證工作執行緒的 startTime 屬性初始化
        // startTime的初始化和startTimeInitialized.countDown()方法會在Worker#run
        // [也就是workerThread.start()中]完成
        while (startTime == 0) {
            try {
                startTimeInitialized.await();
            } catch (InterruptedException ignore) {
                // Ignore - it will be ready very soon.
            }
        }
    }

6.時間輪排程

時間輪每撥動一次就會觸發tick++,然後tick與mask(時間輪陣列長度 - 1)進行 & 運算,可以快速定位時間輪陣列內的槽【mask定址識別符號用於快速定址,其原理:當x=2^n(n為自然數)時 a%x=a&(x-1)】。因為 tick 值一直在增加,所以時間輪陣列看起來就像一個不斷迴圈的圓。

  • 先初始化 startTime 值,因為後面任務執行的時間是根據 startTime 計算的
  • 時鐘撥動,如果時間未到,則 sleep 一會兒
  • 處理過期的任務
  • 將任務載入進時間輪
  • 執行當前時鐘對應時間輪內的任務
  • 時間輪關閉,將所有未執行的任務封裝到 unprocessedTimeouts 集合中,在 stop 方法中返回出去
  • 處理過期的任務
public void run() {
            // 初始化 startTime
            startTime = System.nanoTime();
            if (startTime == 0) {
                startTime = 1;
            }

            // 用來喚醒被阻塞的 HashedWheelTimer#start() 方法,保證 startTime 初始化
            startTimeInitialized.countDown();

            do {
                // 時鐘撥動,有返回值(必須是正數)的時候說明可以撥動時鐘了
                final long deadline = waitForNextTick();
                if (deadline > 0) {
                    int idx = (int) (tick & mask);
                    // 處理過期的任務
                    processCancelledTasks();
                    HashedWheelBucket bucket = wheel[idx];
                    // 將任務載入進時間輪
                    transferTimeoutsToBuckets();
                    // 執行當前時間輪槽內的任務
                    bucket.expireTimeouts(deadline);
                    tick++;
                }
            } while (workerStateUpdater.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

            // 時間輪關閉,將還未執行的任務以列表的形式儲存到 unprocessedTimeouts 集合中,在 stop 方法中返回出去
            // 還未執行的任務可能會在兩個地方,一:時間輪陣列內,二:佇列中
            for (HashedWheelBucket bucket : wheel) {
                bucket.clearTimeouts(unprocessedTimeouts);
            }
            for (;;) {
                HashedWheelTimeout timeout = timeouts.poll();
                if (timeout == null) {
                    break;
                }
                if (!timeout.isCancelled()) {
                    unprocessedTimeouts.add(timeout);
                }
            }
            processCancelledTasks();
        }

7.時間撥動

Worker#waitForNextTick:

  • 當時鍾撥動一次後,應該計算下一次時鐘撥動的時間
  • 獲取當前時間的相對時間
  • 計算距離時鐘下次撥動的時間,也就是sleepTimeMs。這裡之所以加 999999 後再除 10000000, 是為了保證足夠的 sleep 時間。例如:當 deadline - currentTime = 2000002 的時候,如果不加 999999,則只睡了 2ms。而 2ms 其實是未到達 deadline 時間點的,所以為了使上述情況能 sleep 足夠的時間,加上 999999 後,會多睡 1ms。
  • 如果還沒到就 sleep 一會兒,等到撥動時間再醒來。
  • 進入下一次迴圈,直到sleepTimeMs<=0 說明可以撥動時鐘了
        private long waitForNextTick() {
            // 計算時鐘下次撥動的相對時間
            long deadline = tickDuration * (tick + 1);

            for (;;) {
                // 獲取當前時間的相對時間
                final long currentTime = System.nanoTime() - startTime;
                // 計算距離時鐘下次撥動的時間
                long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
            	// <=0 說明可以撥動時鐘了
                if (sleepTimeMs <= 0) {
                    if (currentTime == Long.MIN_VALUE) {
                        return -Long.MAX_VALUE;
                    } else {
                        return currentTime;
                    }
                }
            	// sleep 到下次時鐘撥動
                try {
                    Thread.sleep(sleepTimeMs);
                } catch (InterruptedException ignored) {
                    if (workerStateUpdater.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                        return Long.MIN_VALUE;
                    }
                }
            }
        }

8.移除取消的任務

Worker#processCancelledTasks:

  • 遍歷cancelledTimeouts中所有範例並從其對應HashedWheelBucket中移除。
  • 在呼叫HashedWheelTimer的stop方法的時候會將要取消的HashedWheelTimeout範例放入到cancelledTimeouts佇列中,所以這裡只需要迴圈把佇列中的資料取出來,然後呼叫HashedWheelTimeout的remove方法將自己在bucket移除就好了。
        private void processCancelledTasks() {
            //遍歷cancelledTimeouts中所有範例並從其對應HashedWheelBucket中移除
            for (;;) {
                HashedWheelTimeout timeout = cancelledTimeouts.poll();
                if (timeout == null) {
                    // all processed
                    break;
                }
                try {
                    timeout.remove();
                } catch (Throwable t) {
                    if (LOG.isWarnEnabled()) {
                        LOG.warn("An exception was thrown while process a cancellation task", t);
                    }
                }
            }
        }

9.將任務從佇列載入進時間輪

Worker#transferTimeoutsToBuckets:
在上面也提到過,任務剛加進來不會立即到時間輪中去,而是暫時儲存到一個佇列中,當時間輪時鐘撥動時,會將任務從佇列中載入進時間輪內。

  • 每次呼叫這個方法會處理10w個任務,以免阻塞worker執行緒
  • 從timeouts中取出任務
  • 在校驗之後會用timeout的deadline除以每次tick執行的時間tickDuration得出需要經過多少次時鐘撥動才會執行這個timeout的任務
  • 計算時間輪撥動的圈數。由於timeout的deadline實際上還包含了worker執行緒啟動到timeout加入佇列這段時間,所以在算remainingRounds的時候需要減去當前的tick次數。
  • 將任務載入進時間輪對應的槽內,可能有多個任務經過 hash 計算後定位到同一個槽,這些任務會以雙向連結串列的結構儲存,有點類似 HashMap 處理碰撞的情況。
        private void transferTimeoutsToBuckets() {
            // 一次最多隻處理佇列中的 100000 個任務
            for (int i = 0; i < 100000; i++) {
                HashedWheelTimeout timeout = timeouts.poll();
                if (timeout == null) {
                    // all processed
                    break;
                }
                // 過濾已經取消的任務
                if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
                    // Was cancelled in the meantime.
                    continue;
                }
				// 計算當前任務到執行還需要經過幾次時鐘撥動
                // 假設時間輪陣列大小是 10,calculated 為 12,需要時間輪轉動一圈加兩次時鐘撥動後後才能執行這個任務,因此還需要計算一下圈數
                long calculated = timeout.deadline / tickDuration;
                // 計算當前任務到執行還需要經過幾圈時鐘撥動
                timeout.remainingRounds = (calculated - tick) / wheel.length;
				// 有的任務可能在佇列裡很長時間,時間過期了也沒有被排程,將這種情況的任務放在當前輪次內執行
                final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
                // 計算任務在時間輪陣列中的槽
                int stopIndex = (int) (ticks & mask);

                HashedWheelBucket bucket = wheel[stopIndex];
                // 將任務放到時間輪的陣列中,多個任務可能定位時間輪的同一個槽,這些任務通過以連結串列的形式連結
                bucket.addTimeout(timeout);
            }
        }

10.執行任務

HashedWheelBucket#expireTimeouts:
時間輪槽內的任務以連結串列形式儲存,這些任務執行的時間可能會不一樣,有的在當前時鐘執行,有的在下一圈或者之後幾圈對應的時鐘才會執行。當任務在當前時鐘執行時,需要將這個任務從連結串列中刪除,重新維護連結串列關係。

        public void expireTimeouts(long deadline) {
            HashedWheelTimeout timeout = head;

            // process all timeouts
            while (timeout != null) {
                HashedWheelTimeout next = timeout.next;
                // 任務執行的圈數 > 0,表示任務還需要經過 remainingRounds 圈時鐘迴圈才能執行
                if (timeout.remainingRounds <= 0) {
                    // 從連結串列中移除當前任務,並返回連結串列中下一個任務
                    next = remove(timeout);
                    if (timeout.deadline <= deadline) {
                        // 執行任務
                        timeout.expire();
                    } else {
                        // The timeout was placed into a wrong slot. This should never happen.
                        throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)",
                            timeout.deadline, deadline));
                    }
                } else if (timeout.isCancelled()) {
                    // 過濾取消的任務
                    next = remove(timeout);
                } else {
                    // 圈數 -1
                    timeout.remainingRounds--;
                }
                timeout = next;
            }
        }

HashedWheelTimeout#expire:

  • CAS切換任務狀態
  • task.run(this)執行任務
        public void expire() {
            // CAS任務狀態變換
            if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
                return;
            }

            try {
                task.run(this);
            } catch (Throwable t) {
                if (LOG.isWarnEnabled()) {
                    LOG.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
                }
            }
        }

11.終止時間輪

HashedWheelTimer#stop():
觸發終止時間輪的鏈路為:NodeImpl#destroyAllTimers()->RepeatedTimer#destroy()->timer.stop()

  • 當終止時間輪時,將時間輪的狀態修改為 WORKER_STATE_SHUTDOWN。時間輪狀態有兩種情況:(1)WORKER_STATE_INIT:當初始化時間輪物件時並不會立即開啟時間輪工作執行緒,而是第一次新增任務時才開啟,此狀態表示時間輪沒有處理過任務(2)WORKER_STATE_STARTED:時間輪在工作,這裡也有兩種情況,存在並行與不存在並行,如果多個執行緒都嘗試終止時間輪,肯定只能有一個成功。
  • 時間輪停止執行後會將未執行的任務返回出去,至於怎麼處理這些任務,由業務方自己定義,這個流程和執行緒池的 shutdownNow 方法是類似的。
  • 如果時間輪在執行,如果時間輪處於非執行狀態,會把時間輪陣列與佇列中未執行且未取消的任務儲存到 unprocessedTimeouts 集合中。而終止時間輪成功的執行緒只需要等待一會兒,這個等待通過 workerThread.join(100)實現。
    public Set<Timeout> stop() {
        // 終止時間輪的執行緒不能是時間輪的工作執行緒
        if (Thread.currentThread() == workerThread) {
            throw new IllegalStateException(HashedWheelTimer.class.getSimpleName() + ".stop() cannot be called from "
                                            + TimerTask.class.getSimpleName());
        }
        // 將時間輪的狀態修改為 WORKER_STATE_SHUTDOWN,這裡有兩種情況
        // 一:時間輪是 WORKER_STATE_INIT 狀態,表明時間輪從建立到終止一直沒有任務進來
        // 二:時間輪是 WORKER_STATE_STARTED 狀態,多個執行緒嘗試終止時間輪,只有一個操作成功
        if (!workerStateUpdater.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
            // 程式碼走到這裡,時間輪只能是兩種狀態中的一個,WORKER_STATE_INIT 和 WORKER_STATE_SHUTDOWN
            // 為 WORKER_STATE_INIT 表示時間輪沒有任務,因此不用返回未處理的任務,但是需要將時間輪範例 -1
            // 為 WORKER_STATE_SHUTDOWN 表示是 CAS 操作失敗,什麼都不用做,因為 CAS 成功的執行緒會處理
            if (workerStateUpdater.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
                 // 時間輪範例物件 -1
                instanceCounter.decrementAndGet();
            }
        	// CAS 操作失敗,或者時間輪沒有處理過任務,返回空的任務列表
            return Collections.emptySet();
        }

        try {
            boolean interrupted = false;
            while (workerThread.isAlive()) {
                // 中斷時間輪工作執行緒
                workerThread.interrupt();
                try {
                    // 終止時間輪的執行緒等待時間輪工作執行緒 100ms,這個過程主要是為了時間輪工作執行緒處理未執行的任務
                    workerThread.join(100);
                } catch (InterruptedException ignored) {
                    interrupted = true;
                }
            }

            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        } finally {
            instanceCounter.decrementAndGet();
        }
        // 返回未處理的任務
        return worker.unprocessedTimeouts();
    }

寫在後面

SOFAJRaft採用的是Netty的時間輪演演算法來實現任務排程器,但是Netty的時間輪演演算法存在一定缺陷,比如:它是通過單執行緒實現的,如果在執行任務的過程中出現阻塞,會影響後面任務執行。Netty 中的時間輪並不適合建立延遲時間跨度很大的任務,比如往時間輪內丟成百上千個任務並設定 10 天后執行,這樣可能會導致連結串列過長 round 值很大,而且這些任務在執行之前會一直佔用記憶體。
在閱讀這部分程式碼的時候,發生了一個有趣的事情。作者發現在時間輪演演算法中有一部分程式碼是可以被優化的:

在HashedWheelTimer#normalizeTicksPerWheel方法中,當ticksPerWheel的值較大時,這個方法會迴圈很多次,方法執行時間會不穩定,導致效率可能會偏低。感覺可以使用java8 HashMap的相關實現來完善改演演算法,具體實現如下:

int n = ticksPerWheel - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
// 此處1073741824 = 2^30,防止溢位
return (n < 0) ? 1 : (n >= 1073741824) ? 1073741824 : n + 1;

想到這些,於是我就去給SOFAJRaft社群提了一個issue,得到了幾位大佬的approve。於是乎我就提了PR。雖然看程式碼確實頭疼,但是整個過程還是挺快樂。

到這裡SOFAJRaft的定時任務排程器就差不多完整的走了一遍,第一遍看確實很容易懵逼,但是再讀幾遍還是會感覺很有成就感的。作者總結完這篇文章,差不多剛過完年,希望後面能繼續堅持下去。