Dubbo原始碼閱讀分享系列文章,歡迎大家關注點贊
時間是一種排程模型, 是一種高效的、批次管理定時任務的排程模型。時間輪一般會實現成一個環形結構,類似一個時鐘,分為很多槽,一個槽代表一個時間間隔,每個槽使用雙向連結串列儲存定時任務;指標週期性地跳動,跳動到一個槽位,就執行該槽位的定時任務。
至於為什麼使用時間輪我們可以對比下我們所熟知的資料結構來分析一下時間輪的優勢,這樣我們就明白為什麼會出現時間輪。
有序佇列的缺點在於新增/刪除任務,我們可以通過樹形結構來進行優化新增/刪除,也就是紅黑樹。
堆具有特點必須是完全二元樹,任一結點的值是其子樹所有結點的最大值或最小值
對於時間輪的實現一般是環狀+連結串列,這樣子整體複雜度為:
整體上看看上去我們可以選擇紅黑樹、最小堆、時間輪,但是如果是多執行緒情況,紅黑樹、最小堆執行操作需要鎖住整個內容,而時間輪就不需要,類似分段式鎖的概念,因此更優選擇是時間輪。
下圖是一個單層時間輪,假設下圖時間輪的週期是1秒,時間輪中有10個槽位,則每個槽位代表的時間就是100ms,現在有A、B、C三個任務,分別是任務A(230ms後執行)、B(450ms之後執行)、C(1950ms之後執行)。我們可以看到任務A被放到了槽位2,任務B被放到了槽位4,任務C被放到了槽位9,當時間輪轉動到對應的槽時,就會從槽中取出任務判斷是否需要執行。這個裡面涉及一個週期概念,任務C具有一個週期,當時間輪完成一次迴圈,下次執行到9的時候,任務C才會執行,目前Dubbo中採用單層時間輪機制。
對應多層時間輪就是具有多個時間輪,下圖中具有兩個時間輪,第一層時間輪還是保持和單層時間輪一樣,第二層時間輪為一個週期為10秒的時間輪,還是按照上述案例,這個時候A、B任務還是被分配在第一層時間輪,對於C任務,當完成完成一個週期以後,第二層時間輪刻度會執行到1的位置,同時任務C也會被取出到第一層時間輪9的位置,當一層時間輪再次轉動到9的位置的時候,則會觸發任務C,這種將第二層的任務取出放入第一層中稱為降層,它是為了保證任務被處理的時間精度。Kafka內部就是採用的這種多層時間輪機制。
Dubbo中時間輪的設計都位於org.apache.dubbo.common.timer包中,我們首先來看下核心介面的設計:
TimerTask封裝了要執行的任務,所有的定時任務都需要繼承TimerTask介面,TimerTask就是任務交接入口,該方法內部只有一個run方法,該方法接收一個Timeout型別。
Timeout與TimerTask一一對應,Timeout主要是為了獲取定時任務的狀態以及操作定時任務,Timeout與TimerTask兩者的關係類似於執行緒池返回的Future物件與提交到執行緒池中的任務物件之間的關係。
Timer介面定義了定時器的基本行為,核心是newTimeout方法:提交一個定時任務並返回關聯的Timeout物件。
HashedWheelTimeout是Timeout的唯一實現,它的作用有兩個:
首先來看下HashedWheelTimeout核心欄位,該核心欄位的設計表明連結串列的結構是一個雙向連結串列:
//初始化狀態
private static final int ST_INIT = 0;
//被取消狀態
private static final int ST_CANCELLED = 1;
//過期狀態
private static final int ST_EXPIRED = 2;
//更新定時任務的狀態
private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
//時間輪物件
private final HashedWheelTimer timer;
//實際執行的任務
private final TimerTask task;
//定時任務執行的時間
private final long deadline;
//預設狀態是初始化
@SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization"})
private volatile int state = ST_INIT;
//當前任務剩餘的時鐘週期數
long remainingRounds;
//當前定時任務在連結串列中的前驅節點和後繼節點 設計為一個雙向連結串列
HashedWheelTimeout next;
HashedWheelTimeout prev;
//時間輪中的一個槽
//每個槽維護一個雙向連結串列,當時間輪指標轉到當前槽時,就會從槽所負責的雙向連結串列中取出任務進行處理
HashedWheelBucket bucket;
HashedWheelTimeout核心方法介紹:
@Override
public boolean cancel() {
//CAS變更狀態
if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
return false;
}
//任務被取消時,時間輪會將它暫存到時間輪所維護的canceledTimeouts佇列中.
//當時間輪轉動到槽進行任務處理之前和時間輪退出執行時都會呼叫cancel,而
//cancel會呼叫remove,從而清理該佇列中被取消的定時任務
timer.cancelledTimeouts.add(this);
return true;
}
void remove() {
//獲取當前任務屬於哪個槽位
HashedWheelBucket bucket = this.bucket;
if (bucket != null) {
//從雙向連結串列中移除節點
bucket.remove(this);
} else {
//當前時間輪所維護的定時任務的數量
timer.pendingTimeouts.decrementAndGet();
}
}
public void expire() {
//CAS修改定時任務狀態為已過期
if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
return;
}
try {
//執行定時任務
task.run(this);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
}
}
}
HashedWheelBucket是時間輪中的一個槽,它內部維護了雙向連結串列的首尾指標,雙向連結串列中的每一個節點就是一個HashedWheelTimeout物件,同時關聯了一個TimerTask定時任務。
private HashedWheelTimeout head;
private HashedWheelTimeout tail;
HashedWheelBucket維護雙向連結串列的頭尾節點,可以遍歷整個連結串列,因此具備了維護任務的能力,接下來我們來看一下HashedWheelBucket的核心方法。
void addTimeout(HashedWheelTimeout timeout) {
//空判斷一下
assert timeout.bucket == null;
timeout.bucket = this;
//如果頭節點為空 說明整個連結串列為空 則設定頭尾為當前節點
if (head == null) {
head = tail = timeout;
} else {
//新增到未節點
tail.next = timeout;
timeout.prev = tail;
tail = timeout;
}
}
/**
* Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.
*/
void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
//時間輪指標轉到某個槽時從雙向連結串列頭節點開始遍歷
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
//當前任務到期
if (timeout.remainingRounds <= 0) {
//移除
next = remove(timeout);
if (timeout.deadline <= deadline) {
//執行任務
timeout.expire();
} else {
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) {
//任務被取消 被移除
next = remove(timeout);
} else {
//時鐘週期減一
timeout.remainingRounds--;
}
//判斷下一個節點
timeout = next;
}
}
void clearTimeouts(Set<Timeout> set) {
for (; ; ) {
HashedWheelTimeout timeout = pollTimeout();
if (timeout == null) {
return;
}
if (timeout.isExpired() || timeout.isCancelled()) {
continue;
}
set.add(timeout);
}
}
HashedWheelTimer實現了Timer介面,它通過時間輪演演算法實現了一個定時器。可以通過newTimeout方法可以向時間輪中新增定時任務,該任務會先被暫存到timeouts佇列中,等時間輪轉動到某個槽時,會將該timeouts佇列中的任務轉移到某個槽所負責的雙向連結串列中。從雙向連結串列的頭部開始迭代,對每個定時任務HashedWheelTimeout進行計算,屬於當前時鐘週期則取出執行,不屬於則將其剩餘的時鐘週期數減一操作。此外還提供停止時間輪的stop方法,以及判斷時間輪是否終止的方法。
//時間輪處理定時任務邏輯
private final Worker worker = new Worker();
//時間輪內部處理定時任務的執行緒
private final Thread workerThread;
private static final int WORKER_STATE_INIT = 0;
private static final int WORKER_STATE_STARTED = 1;
private static final int WORKER_STATE_SHUTDOWN = 2;
//時間輪狀態 0 - init, 1 - started, 2 - shut down
@SuppressWarnings({"unused", "FieldMayBeFinal"})
private volatile int workerState;
//時間輪每個槽所代表的時間
private final long tickDuration;
//時間輪的環形佇列,陣列每個元素都是一個槽,一個槽負責維護一個雙向連結串列,用於儲存定時任務
private final HashedWheelBucket[] wheel;
//wheel.length - 1
private final int mask;
//CountDownLatch保證執行緒已經啟動
private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
//外部向時間輪提交的定時任務
private final Queue<HashedWheelTimeout> timeouts = new LinkedBlockingQueue<>();
//用於暫存被取消的定時任務
private final Queue<HashedWheelTimeout> cancelledTimeouts = new LinkedBlockingQueue<>();
//時間輪剩餘的待處理的定時任務數量
private final AtomicLong pendingTimeouts = new AtomicLong(0);
//最多允許多少個任務等待執行
private final long maxPendingTimeouts;
//當前時間輪的啟動時間
private volatile long startTime;
時間輪的初始化是在HashedWheelTimer的建構函式中完成的,主要就是建立HashedWheelBucket陣列,以及建立workerThread工作執行緒,該執行緒就是負責處理時間輪中的定時任務的執行緒。
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel,
long maxPendingTimeouts) {
//引數校驗
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (tickDuration <= 0) {
throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
}
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
//圓環有多少時間間隔 將ticksPerWheel轉化為一個大於等於該值的2^n的值
wheel = createWheel(ticksPerWheel);
//快速計算槽的位置
mask = wheel.length - 1;
//時間輪每個槽的時間間隔
this.tickDuration = unit.toNanos(tickDuration);
//邊界值檢查
if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
tickDuration, Long.MAX_VALUE / wheel.length));
}
//工作執行緒
workerThread = threadFactory.newThread(worker);
//最多允許多少個任務等待執行
this.maxPendingTimeouts = maxPendingTimeouts;
//限制timer範例個數,最大不超過64
if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
reportTooManyInstances();
}
}
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException(
"ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
if (ticksPerWheel > 1073741824) {
throw new IllegalArgumentException(
"ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
}
//計算建立多少個槽
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
//初始化時間輪陣列
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for (int i = 0; i < wheel.length; i++) {
wheel[i] = new HashedWheelBucket();
}
return wheel;
}
提交定時任務發生在初始化之後,由newTimeout方法完成任務提交,方法內部將待處理的任務數量加1,然後啟動時間輪執行緒,這時worker的run方法就會被系統排程執行。然後將該定時任務封裝成HashedWheelTimeout加入到timeouts佇列中。start之後,時間輪就開始執行起來了,直到外界呼叫stop方法終止退出。
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
if (task == null) {
throw new NullPointerException("task");
}
if (unit == null) {
throw new NullPointerException("unit");
}
//任務數加1
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
//判斷是否超過最大任務數
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
//啟動時間輪
start();
//計算定時任務的deadline
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
//引數校驗
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
//建立一個HashedWheelTimeout物件
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
//被暫存到timeouts佇列中
timeouts.add(timeout);
return timeout;
}
public void start() {
//判斷時間輪狀態
//1.如果是初始化, 則啟動worker執行緒, 啟動整個時間輪
//2. 如果已經啟動則略過
//3. 如果是已經停止,則報錯
switch (WORKER_STATE_UPDATER.get(this)) {
case WORKER_STATE_INIT:
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
//等待worker執行緒初始化時間輪的啟動時間
while (startTime == 0) {
try {
//countDownLatch來確保排程的執行緒已經被啟動
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}
Worker實現了Runnable介面,也就是時間輪內部的工作執行緒,工作執行緒來處理放入時間輪中的定時任務。對於該方法核心就是run方法,
public void run() {
//初始化startTime時間輪初始化以後的時候
startTime = System.nanoTime();
if (startTime == 0) {
// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
startTime = 1;
}
//喚醒阻塞執行緒
startTimeInitialized.countDown();
do {
//判斷是否到了處理槽的時間 如果沒到sleep
final long deadline = waitForNextTick();
if (deadline > 0) {
//獲取對應槽
int idx = (int) (tick & mask);
//清理使用者主動取消的定時任務
processCancelledTasks();
//獲取當前指標對應的槽位
HashedWheelBucket bucket =
wheel[idx];
//將timeouts佇列中的定時任務轉移到時間輪中對應的槽中
transferTimeoutsToBuckets();
//處理該槽位的雙向連結串列中的定時任務
bucket.expireTimeouts(deadline);
tick++;
}
//執行狀態一直迴圈
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
//執行到此處說明時間輪被停止了
//清除所有槽中的任務, 並加入到未處理任務列表
for (HashedWheelBucket bucket : wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
//將還沒有加入到槽中的待處理定時任務佇列中的任務取出, 如果是未取消的任務,
//則加入到未處理任務佇列中
for (; ; ) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
//最後再次清理cancelledTimeouts佇列中使用者主動取消的定時任務
processCancelledTasks();
}
在Dubbo中有關於時間輪的應用有兩個核心的抽象類,一個是AbstractRetryTask,另外一個是AbstractTimerTask,關於AbstractRetryTask重試機制我們在註冊中心的時候已經介紹完成,這裡重點看下AbstractTimerTask實現。 關於AbstractTimerTask有是三個實現類,一個用來關閉連線,一個是心跳檢查,一個是重連線,AbstractTimerTask會呼叫不同的實現,我們來看下HeartbeatTimerTask實現,
protected void doTask(Channel channel) {
try {
//獲取最後一次讀寫時間
Long lastRead = lastRead(channel);
Long lastWrite = lastWrite(channel);
if ((lastRead != null && now() - lastRead > heartbeat)
|| (lastWrite != null && now() - lastWrite > heartbeat)) {
//最後一次讀寫時間超過心跳時間,就會傳送心跳請求
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setEvent(HEARTBEAT_EVENT);
//傳送心跳資訊
channel.send(req);
if (logger.isDebugEnabled()) {
logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
+ ", cause: The channel has no data-transmission exceeds a heartbeat period: "
+ heartbeat + "ms");
}
}
} catch (Throwable t) {
logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
}
}
歡迎大家點點關注,點點贊!