Redis之時間輪機制(五)

2022-06-19 12:02:36

一、什麼是時間輪

        時間輪這個技術其實出來很久了,在kafka、zookeeper等技術中都有時間輪使用的方式。 時間輪是一種高效利用執行緒資源進行批次化排程的一種排程模型。把大批次的排程任務全部繫結到同一個排程器上,使用這一個排程器來進行所有任務的管理、觸發、以及執行。所以時間輪的模型能夠高效管理各種延時任務、週期任務、通知任務。 以後大家在工作中遇到類似的功能,可以採用時間輪機制。如下圖所示,時間輪,從圖片上來看,就和手錶的表圈是一樣,所以稱為時間輪,是因為它是以時間作為刻度組成的一個環形佇列,這個環形佇列採用陣列來實現,陣列的每個元素稱為槽,每個槽可以放一個定時任務列表,叫HashedWheelBucket,它是一個雙向連結串列,量表的每一項表示一個定時任務項(HashedWhellTimeout),其中封裝了真正的定時任務TimerTask。時間輪是由多個時間格組成,下圖中有8個時間格,每個時間格代表當前時間輪的基本時間跨度(tickDuration),其中時間輪的時間格的個數是固定的。在下圖中,有8個時間格(槽),假設每個時間格的單位為1s,那麼整個時間輪走完一圈需要8s鍾。每秒鐘指標會沿著順時針方向移動一個,這個單位可以設定,比如以秒為單位,可以以一小時為單位,這個單位可以代表時間精度。通過指標移動,來獲得每個時間格中的任務列表,然後遍歷這一個時間格中的雙向連結串列來執行任務,以此迴圈。

 二、時間輪案例使用

這裡使用的時間輪是Netty這個包中提供的,使用方法比較簡單。

  • 先構建一個HashedWheelTimer時間輪。
    • tickDuration: 100 ,表示每個時間格代表當前時間輪的基本時間跨度,這裡是100ms,也就是指標100ms跳動一次,每次跳動一個窗格
    • ticksPerWheel:1024,表示時間輪上一共有多少個窗格,分配的窗格越多,佔用記憶體空間就越大
    • leakDetection:是否開啟記憶體漏失檢測。
    • maxPendingTimeouts[可選引數],最大允許等待的任務數,預設沒有限制。
  • 通過newTimeout()把需要延遲執行的任務新增到時間輪中
@RestController
@RequestMapping("/timer")
public class HashedWheelController {

    //時間輪的定義
    HashedWheelTimer hashedWheelTimer=new HashedWheelTimer(
            new DefaultThreadFactory("demo-timer"),
            100, TimeUnit.MILLISECONDS,1024,false);

    @GetMapping("/{delay}")
    public void tick(@PathVariable("delay")Long delay){
        //SCHEDULED(定時執行的執行緒)
        //Timer(Java原生定時任務執行)
        System.out.println("CurrentTime:"+new Date());
        hashedWheelTimer.newTimeout(timeout -> {
            System.out.println("多少秒後執行這任務:"+new Date());
        },delay,TimeUnit.SECONDS);
    }
}

三、時間輪的原理解析

時間輪的整體原理,分為幾個部分。

建立時間輪

時間輪本質上是一個環狀陣列,比如我們初始化時間輪時:ticksPerWheel=8,那麼意味著這個環狀陣列的長度是8,如圖3-12所示。

HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];

 

 

 新增任務

  • 當通過newTimeout()方法新增一個延遲任務時,該任務首先會加入到一個阻塞佇列中中。然後會有一個定時任務從該佇列獲取任務,新增到時間輪的指定位置,計算方法如下。

//當前任務的開始執行時間除以每個視窗的時間間隔,得到一個calculated值(表示需要經過多少tick,指標沒跳動一個窗格,tick會遞增),單位
為nanos(微毫秒)
long calculated = timeout.deadline / tickDuration;
//計算當前任務需要在時間輪中經歷的圈數,因為當前任務執行時間有可能大於完整一圈的時間,所以需要計算經過幾圈之後才能執行該任務。
timeout.remainingRounds = (calculated - tick) / wheel.length;
//取最大的一個tick,有可能當前任務在佇列中已經過了執行時間,這種情況下直接用calculated這個值就沒意義了。
final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
int stopIndex = (int) (ticks & mask); //通過ticks取模mask,得到一個下標
HashedWheelBucket bucket = wheel[stopIndex]; //把任務新增到指定陣列下標位置

 

 

任務執行

  • Worker執行緒按照每次間隔時間轉動後,得到該時間窗格中的任務連結串列,然後從連結串列的head開始逐個取出任務,有兩個判斷條件
  • 當前任務需要轉動的圈數為0,表示任務是當前圈開始執行
  • 當前任務達到了delay時間,也就是timeout.deadline <= deadline
  • 最終呼叫timeout.expire()方法執行任務。
        public void expireTimeouts(long deadline) {
            HashedWheelTimeout timeout = head;

            // process all timeouts
            while (timeout != null) {
                HashedWheelTimeout next = timeout.next;
                if (timeout.remainingRounds <= 0) {
                    next = remove(timeout);
                    if (timeout.deadline <= deadline) {
                        timeout.expire();
                    } else {
                        // The timeout was placed into a wrong slot. This should never happen.
                        throw new IllegalStateException(String.format(
                                "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
                    }
                } else if (timeout.isCancelled()) {
                    next = remove(timeout);
                } else {
                    timeout.remainingRounds --;
                }
                timeout = next;
            }
        }

四、時間輪的原始碼分析

HashedWheelTimer的構造

  • 呼叫 createWheel 建立一個時間輪,時間輪陣列一定是 2 的冪次方,比如傳入的 ticksPerWheel=6,那麼初始化的 wheel 長度一定是 8,這樣是便於時間格的計算。
  • tickDuration,表示時間輪的跨度,代表每個時間格的時間精度,以納秒的方式來表現。
  • 把工作執行緒 Worker 封裝成 WorkerThread,從名字可以知道,它就是最終那個負責幹活的執行緒。
public HashedWheelTimer(
    ThreadFactory threadFactory,
    long tickDuration, TimeUnit unit, int ticksPerWheel,
    long maxPendingTimeouts) {
    // 建立時間輪基本的資料結構,一個陣列。長度為不小於ticksPerWheel的最小2的n次方
    wheel = createWheel(ticksPerWheel);
    // 這是一個標示符,用來快速計算任務應該呆的格子。
    // 我們知道,給定一個deadline的定時任務,其應該呆的格子=deadline%wheel.length.但是%操作是個相對耗時的操作,所以使用一種變通的位運算代替:
    // 因為一圈的長度為2的n次方,mask = 2^n-1後低位將全部是1,然後deadline&mast == deadline%wheel.length
    // java中的HashMap在進行hash之後,進行index的hash定址定址的演演算法也是和這個一樣的
    mask = wheel.length - 1;
    
    //時間輪的基本時間跨度,(tickDuration傳入是1的話,這裡會轉換成1000000)
    this.tickDuration = unit.toNanos(tickDuration);

    // 校驗是否存在溢位。即指標轉動的時間間隔不能太長而導致tickDuration*wheel.length>Long.MAX_VALUE
    if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
        throw new IllegalArgumentException(String.format(
            "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
            tickDuration, Long.MAX_VALUE / wheel.length));
    }
    //把worker包裝成thread
    workerThread = threadFactory.newThread(worker);
    
    this.maxPendingTimeouts = maxPendingTimeouts;
    //如果HashedWheelTimer範例太多,那麼就會列印一個error紀錄檔
    if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
        WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
        reportTooManyInstances();
    }
}
  • 對傳入的 ticksPerWheel 進行整形
  • 初始化固定長度的 HashedWheelBucket
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
    if (ticksPerWheel <= 0) {
        throw new IllegalArgumentException(
            "ticksPerWheel must be greater than 0: " + ticksPerWheel);
    }
    if (ticksPerWheel > 1073741824) {
        throw new IllegalArgumentException(
            "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
    }
    //對傳入的時間輪大小進行整形,整形成2的冪次方
    ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
    //初始化一個固定長度的Bucket陣列
    HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
    for (int i = 0; i < wheel.length; i++) {
        wheel[i] = new HashedWheelBucket();
    }
    return wheel;
}

新增任務到時間輪

呼叫 newTimeout 方法,把任務新增進來。

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    if (unit == null) {
        throw new NullPointerException("unit");
    }
    //統計任務個數
    long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
    //判斷最大任務數量是否超過限制
    if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
        pendingTimeouts.decrementAndGet();
        throw new RejectedExecutionException("Number of pending timeouts ("
                                             + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
                                             + "timeouts (" + maxPendingTimeouts + ")");
    }
   //如果時間輪沒有啟動,則通過start方法進行啟動
    start();
    
    // Add the timeout to the timeout queue which will be processed on the next tick.
    // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
    
    //計算任務的延遲時間,通過當前的時間+當前任務執行的延遲時間-時間輪啟動的時間。
    long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

     //在delay為正數的情況下,deadline是不可能為負數
    //如果為負數,那麼說明超過了long的最大值
    if (delay > 0 && deadline < 0) {
        deadline = Long.MAX_VALUE;
    }
    //建立一個Timeout任務,理論上來說,這個任務應該要加入到時間輪的時間格子中,但是這裡並不是先新增到時間格,而是先
    //加入到一個阻塞佇列,然後等到時間輪執行到下一個格子時,再從佇列中取出最多100000個任務新增到指定的時間格(槽)中。
    HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
    timeouts.add(timeout);
    return timeout;
}

start

任務新增到阻塞佇列之後,我們再來看啟動方法。

start 方法會根據當前的 workerState 狀態來啟動時間輪。並且用了 startTimeInitialized 來控制執行緒的執行,如果 workerThread 沒有啟動起來,那麼 newTimeout 方法會一直阻塞在執行 start 方法中。如果不阻塞,newTimeout 方法會獲取不到 startTime。

public void start() {
    //workerState一開始的時候是0(WORKER_STATE_INIT),然後才會設定為1(WORKER_STATE_STARTED)
    switch (WORKER_STATE_UPDATER.get(this)) {
        case WORKER_STATE_INIT:
            if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                workerThread.start();
            }
            break;
        case WORKER_STATE_STARTED:
            break;
        case WORKER_STATE_SHUTDOWN:
            throw new IllegalStateException("cannot be started once stopped");
        default:
            throw new Error("Invalid WorkerState");
    }

    // 等待worker執行緒初始化時間輪的啟動時間
    while (startTime == 0) {
        try {
            //這裡使用countDownLauch來確保排程的執行緒已經被啟動
            startTimeInitialized.await();
        } catch (InterruptedException ignore) {
            // Ignore - it will be ready very soon.
        }
    }
}

啟動時間輪

呼叫 start()方法, 會呼叫 workerThread.start(); 來啟動一個工作執行緒,這個工作執行緒是在構造方法中初始化的,包裝的是一個 Worker 內部執行緒類。

所以直接進入到 Worker 這個類的 run 方法,瞭解下它的設計邏輯:

public void run() {
    // 初始化startTime,表示時間輪的啟動時間
    startTime = System.nanoTime();
    if (startTime == 0) {
        // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
        startTime = 1;
    } 
    
    // 喚醒被阻塞的start()方法。
    startTimeInitialized.countDown();

    do {
        //返回每tick一次的時間間隔
        final long deadline = waitForNextTick();
        if (deadline > 0) {
            //計算時間輪的槽位
            int idx = (int) (tick & mask);
            //移除掉CancelledTask
            processCancelledTasks();
            //得到當前指標位置的時間槽
            HashedWheelBucket bucket =
                wheel[idx];
            //將newTimeout()方法中加入到待處理定時任務佇列中的任務加入到指定的格子中
            transferTimeoutsToBuckets();
            //執行目前指標指向的槽中的bucket連結串列中的任務
            bucket.expireTimeouts(deadline);
            tick++;
        }
    } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
     //如果Worker_State一隻是started狀態,就一直迴圈
    // Fill the unprocessedTimeouts so we can return them from stop() method.
    
    for (HashedWheelBucket bucket : wheel) {
        bucket.clearTimeouts(unprocessedTimeouts); //清除時間輪中不需要處理的任務
    }
    for (; ; ) {
        //遍歷任務佇列,發現如果有任務被取消,則新增到unprocessedTimeouts,也就是不需要處理的佇列中。
        HashedWheelTimeout timeout = timeouts.poll();
        if (timeout == null) {
            break;
        }
       
        if (!timeout.isCancelled()) {
            unprocessedTimeouts.add(timeout);
        }
    }
    //處理被取消的任務.
    processCancelledTasks();
}

時間輪指標跳動

這個方法的主要作用就是返回下一個指標指向的時間間隔,然後進行 sleep 操作。

大家可以想象一下,一個鐘錶上秒與秒之間是有時間間隔的,那麼 waitForNextTick 就是根據當前時間計算出跳動到下個時間的時間間隔,然後進行 sleep,然後再返回當前時間距離時間輪啟動時間的時間間隔。

說得再直白一點:,假設當前的 tickDuration 的間隔是 1s,tick 預設 = 0, 此時第一次進來,得到的 deadline=1,也就是下一次跳動的時間間隔是 1s。假設當前處於:
 

private long waitForNextTick() {
    //tick表示總的tick數
    //tickDuration表示每個時間格的跨度,所以deadline返回的是下一次時間輪指標跳動的時間
    long deadline = tickDuration * (tick + 1);

    for (; ; ) {
        //計算當前時間距離啟動時間的時間間隔
        final long currentTime = System.nanoTime() - startTime;
        //通過下一次指標跳動的延遲時間距離當前時間的差額,這個作為sleep時間使用。
        // 其實執行緒是以睡眠一定的時候再來執行下一個ticket的任務的
        long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
        //sleepTimeMs小於零表示走到了下一個時間槽位置
        if (sleepTimeMs <= 0) {
            if (currentTime == Long.MIN_VALUE) {
                return -Long.MAX_VALUE;
            } else {
                return currentTime;
            }
        }
        if (isWindows()) {
            sleepTimeMs = sleepTimeMs / 10 * 10;
        }
        //進入到這裡進行sleep,表示當前時間距離下一次tick時間還有一段距離,需要sleep。
        try {
            Thread.sleep(sleepTimeMs);
        } catch (InterruptedException ignored) {
            if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                return Long.MIN_VALUE;
            }
        }
    }
}

transferTimeoutsToBuckets

轉移任務到時間輪中,前面我們講過,任務新增進來時,是先放入到阻塞佇列。

而在現在這個方法中,就是把阻塞佇列中的資料轉移到時間輪的指定位置。

在這個轉移方法中,寫死了一個迴圈,每次都只轉移 10 萬個任務。

然後根據 HashedWheelTimeout 的 deadline 延遲時間計算出時間輪需要執行多少次才能執行當前的任務,如果當前的任務延遲時間大於時間輪跑一圈所需要的時間,那麼就計算需要跑幾圈才能到這個任務執行。

最後計算出該任務在時間輪中的槽位,新增到時間輪的連結串列中。

private void transferTimeoutsToBuckets() {
    // 迴圈100000次,也就是每次轉移10w個任務
    for (int i = 0; i < 100000; i++) {
        //從阻塞佇列中獲得具體的任務
        HashedWheelTimeout timeout = timeouts.poll();
        if (timeout == null) {
            // all processed
            break;
        }
        if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
            // Was cancelled in the meantime.
            continue;
        }
        //計算tick次數,deadline表示當前任務的延遲時間,tickDuration表示時間槽的間隔,兩者相除就可以計算當前任務需要tick幾次才能被執行
        long calculated = timeout.deadline / tickDuration;
         // 計算剩餘的輪數, 只有 timer 走夠輪數, 並且到達了 task 所在的 slot, task 才會過期.(被執行)
        timeout.remainingRounds = (calculated - tick) / wheel.length;
     
        //如果任務在timeouts佇列裡面放久了, 以至於已經過了執行時間, 這個時候就使用當前tick, 也就是放到當前bucket, 此方法呼叫完後就會被執行
        final long ticks = Math.max(calculated, tick);
        // 算出任務應該插入的 wheel 的 slot, stopIndex = tick 次數 & mask, mask = wheel.length - 1
        int stopIndex = (int) (ticks & mask);
        //把timeout任務插入到指定的bucket鏈中。
        HashedWheelBucket bucket = wheel[stopIndex];
        bucket.addTimeout(timeout);
    }
}

執行時間輪中的任務

當指標跳動到某一個時間槽中時,會就觸發這個槽中的任務的執行。該功能是通過 expireTimeouts 來實現

這個方法的主要作用是: 過期並執行格子中到期的任務。也就是當 tick 進入到指定格子時,worker 執行緒會呼叫這個方法

HashedWheelBucket 是一個連結串列,所以我們需要從 head 節點往下進行遍歷。如果連結串列沒有遍歷到連結串列尾部那麼就繼續往下遍歷。

獲取的 timeout 節點節點,如果剩餘輪數 remainingRounds 大於 0,那麼就說明要到下一圈才能執行,所以將剩餘輪數減一;

如果當前剩餘輪數小於等於零了,那麼就將當前節點從 bucket 連結串列中移除,並判斷一下當前的時間是否大於 timeout 的延遲時間,如果是則呼叫 timeout 的 expire 執行任務。

void expireTimeouts(long deadline) {
    HashedWheelTimeout timeout = head;

    // 遍歷當前時間槽中的所有任務
    while (timeout != null) {
        HashedWheelTimeout next = timeout.next;
        //如果當前任務要被執行,那麼remainingRounds應該小於或者等於0
        if (timeout.remainingRounds <= 0) {
            //從bucket連結串列中移除當前timeout,並返回連結串列中下一個timeout
            next = remove(timeout);
            //如果timeout的時間小於當前的時間,那麼就呼叫expire執行task
            if (timeout.deadline <= deadline) {
                timeout.expire();
            } else {
                 //不可能發生的情況,就是說round已經為0了,deadline卻>當前槽的deadline
                // The timeout was placed into a wrong slot. This should never happen.
                throw new IllegalStateException(String.format(
                    "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
            }
        } else if (timeout.isCancelled()) {
            next = remove(timeout);
        } else {
            //因為當前的槽位已經過了,說明已經走了一圈了,把輪數減一
            timeout.remainingRounds--;
        }
        //把指標放置到下一個timeout
        timeout = next;
    }
}