netty系列之:HashedWheelTimer一種定時器的高效實現

2022-06-20 21:02:05

簡介

定時器是一種在實際的應用中非常常見和有效的一種工具,其原理就是把要執行的任務按照執行時間的順序進行排序,然後在特定的時間進行執行。JAVA提供了java.util.Timer和java.util.concurrent.ScheduledThreadPoolExecutor等多種Timer工具,但是這些工具在執行效率上面還是有些缺陷,於是netty提供了HashedWheelTimer,一個優化的Timer類。

一起來看看netty的Timer有何不同吧。

java.util.Timer

Timer是JAVA在1.3中引入的。所有的任務都儲存在它裡面的TaskQueue中:

private final TaskQueue queue = new TaskQueue();

TaskQueue的底層是一個TimerTask的陣列,用於儲存要執行的任務。

private TimerTask[] queue = new TimerTask[128];

看起來TimerTask只是一個陣列,但是Timer將這個queue做成了一個平衡二元堆積。

當新增一個TimerTask的時候,會插入到Queue的最後面,然後呼叫fixup方法進行再平衡:

    void add(TimerTask task) {
        // Grow backing store if necessary
        if (size + 1 == queue.length)
            queue = Arrays.copyOf(queue, 2*queue.length);

        queue[++size] = task;
        fixUp(size);
    }

當從heap中移出執行的任務時候,會呼叫fixDown方法進行再平衡:

    void removeMin() {
        queue[1] = queue[size];
        queue[size--] = null;  // Drop extra reference to prevent memory leak
        fixDown(1);
    }

fixup的原理就是將當前的節點和它的父節點進行比較,如果小於父節點就和父節點進行互動,然後遍歷進行這個過程:

    private void fixUp(int k) {
        while (k > 1) {
            int j = k >> 1;
            if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)
                break;
            TimerTask tmp = queue[j];  queue[j] = queue[k]; queue[k] = tmp;
            k = j;
        }
    }

fixDown的原理是比較當前節點和它的子節點,如果當前節點大於子節點,則將其降級:

    private void fixDown(int k) {
        int j;
        while ((j = k << 1) <= size && j > 0) {
            if (j < size &&
                queue[j].nextExecutionTime > queue[j+1].nextExecutionTime)
                j++; // j indexes smallest kid
            if (queue[k].nextExecutionTime <= queue[j].nextExecutionTime)
                break;
            TimerTask tmp = queue[j];  queue[j] = queue[k]; queue[k] = tmp;
            k = j;
        }
    }

二叉平衡堆的演演算法這裡不做詳細的介紹。大家可以自行查詢相關的文章。

java.util.concurrent.ScheduledThreadPoolExecutor

雖然Timer已經很好用了,並且是執行緒安全的,但是對於Timer來說,想要提交任務的話需要建立一個TimerTask類,用來封裝具體的任務,不是很通用。

所以JDK在5.0中引入了一個更加通用的ScheduledThreadPoolExecutor,這是一個執行緒池使用多執行緒來執行具體的任務。當執行緒池中的執行緒個數等於1的時候,ScheduledThreadPoolExecutor就等同於Timer。

ScheduledThreadPoolExecutor中進行任務儲存的是一個DelayedWorkQueue。

DelayedWorkQueue和DelayQueue,PriorityQueue一樣都是一個基於堆的資料結構。

因為堆需要不斷的進行siftUp和siftDown再平衡操作,所以它的時間複雜度是O(log n)。

下面是DelayedWorkQueue的shiftUp和siftDown的實現程式碼:

       private void siftUp(int k, RunnableScheduledFuture<?> key) {
            while (k > 0) {
                int parent = (k - 1) >>> 1;
                RunnableScheduledFuture<?> e = queue[parent];
                if (key.compareTo(e) >= 0)
                    break;
                queue[k] = e;
                setIndex(e, k);
                k = parent;
            }
            queue[k] = key;
            setIndex(key, k);
        }

        private void siftDown(int k, RunnableScheduledFuture<?> key) {
            int half = size >>> 1;
            while (k < half) {
                int child = (k << 1) + 1;
                RunnableScheduledFuture<?> c = queue[child];
                int right = child + 1;
                if (right < size && c.compareTo(queue[right]) > 0)
                    c = queue[child = right];
                if (key.compareTo(c) <= 0)
                    break;
                queue[k] = c;
                setIndex(c, k);
                k = child;
            }
            queue[k] = key;
            setIndex(key, k);
        }

HashedWheelTimer

因為Timer和ScheduledThreadPoolExecutor底層都是基於堆結構的。雖然ScheduledThreadPoolExecutor對Timer進行了改進,但是他們兩個的效率是差不多的。

那麼有沒有更加高效的方法呢?比如O(1)是不是可以達到呢?

我們知道Hash可以實現高效的O(1)查詢,想象一下假如我們有一個無限刻度的鐘表,然後把要執行的任務按照間隔時間長短的順序分配到這些刻度中,每當鐘錶移動一個刻度,即可以執行這個刻度中對應的任務,如下圖所示:

這種演演算法叫做Simple Timing Wheel演演算法。

但是這種演演算法是理論上的演演算法,因為不可能為所有的間隔長度都分配對應的刻度。這樣會耗費大量的無效記憶體空間。

所以我們可以做個折中方案,將間隔時間的長度先用hash進行處理。這樣就可以縮短間隔時間的基數,如下圖所示:

這個例子中,我們選擇8作為基數,間隔時間除以8,餘數作為hash的位置,商作為節點的值。

每次遍歷輪詢的時候,將節點的值減一。當節點的值為0的時候,就表示該節點可以取出執行了。

這種演演算法就叫做HashedWheelTimer。

netty提供了這種演演算法的實現:

public class HashedWheelTimer implements Timer 

HashedWheelTimer使用HashedWheelBucket陣列來儲存具體的TimerTask:

private final HashedWheelBucket[] wheel;

首先來看下建立wheel的方法:

    private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
        //ticksPerWheel may not be greater than 2^30
        checkInRange(ticksPerWheel, 1, 1073741824, "ticksPerWheel");

        ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
        HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
        for (int i = 0; i < wheel.length; i ++) {
            wheel[i] = new HashedWheelBucket();
        }
        return wheel;
    }

我們可以自定義wheel中ticks的大小,但是ticksPerWheel不能超過2^30。

然後將ticksPerWheel的數值進行調整,到2的整數倍。

然後建立ticksPerWheel個元素的HashedWheelBucket陣列。

這裡要注意,雖然整體的wheel是一個hash結構,但是wheel中的每個元素,也就是HashedWheelBucket是一個鏈式結構。

HashedWheelBucket中的每個元素都是一個HashedWheelTimeout. HashedWheelTimeout中有一個remainingRounds屬性用來記錄這個Timeout元素還會在Bucket中儲存多久。

long remainingRounds;

總結

netty中的HashedWheelTimer可以實現更高效的Timer功能,大家用起來吧。

更多內容請參考 http://www.flydean.com/50-netty-hashed-wheel-timer/

最通俗的解讀,最深刻的乾貨,最簡潔的教學,眾多你不知道的小技巧等你來發現!

歡迎關注我的公眾號:「程式那些事」,懂技術,更懂你!