TimerQueue 是.NET中實現定時任務的核心元件,它是一個定時任務的管理器,負責儲存和排程定時任務。它被用於實現很多 .NET 中的定時任務,比如 System.Threading.Timer、Task.Delay、CancellationTokenSource 等。
筆者將用兩篇文章為大家介紹 TimerQueue 的實現原理,本篇文章將以 System.Threading.Timer 為入口,揭祕 TimerQueue 對定時任務基本單元 TimerQueueTimer
的管理和排程,下一篇文章將介紹 TimerQueue 又是如何通過 native timer 被觸發的。
本文將基於.NET 7.0 原始碼揭祕 TimerQueue 的實現原理。
https://github.com/dotnet/runtime/blob/release/7.0/src/libraries/System.Private.CoreLib/src/System/Threading/Timer.cs
Timer 的使用方式非常簡單,只需要建立一個 Timer 範例並通過建構函式指定回撥函數和回撥函數的引數即可。建構函式有幾個不同的過載,可以用不同的時間單位指定 Timer 的首次到期時間 dueTime 和間隔時間 period,這邊以 TimeSpan 為例:
new Timer(callback: TimerCallback, state: "Hello World", dueTime: TimeSpan.FromSeconds(1), period: TimeSpan.FromSeconds(2));
Console.WriteLine("Started timer at {0}", DateTime.Now);
void TimerCallback(object? state)
{
Console.WriteLine("TimerCallback: state = {0} at {1}", state, DateTime.Now);
}
Console.ReadKey();
輸出結果如下:
Started timer at 2023/7/3 21:09:10
TimerCallback: state = Hello World at 2023/7/3 21:09:11
TimerCallback: state = Hello World at 2023/7/3 21:09:13
TimerCallback: state = Hello World at 2023/7/3 21:09:15
TimerCallback: state = Hello World at 2023/7/3 21:09:17
TimerCallback: state = Hello World at 2023/7/3 21:09:19
上面的程式碼建立了一個 Timer 範例,該 Timer 範例會在 1s 後第一次執行回撥函數,之後每隔 2s 執行一次回撥函數。
此外,Timer 還有一個 Change 方法,可以用來修改 Timer 的 dueTime 和 period。
Timer timer = new Timer(callback: TimerCallback, state: "Hello World", dueTime: TimeSpan.FromSeconds(1), period:
TimeSpan.FromSeconds(2));
timer.Change(dueTime: TimeSpan.FromSeconds(2), period: TimeSpan.FromSeconds(3));
Console.WriteLine("Started timer at {0}", DateTime.Now);
void TimerCallback(object? state)
{
Console.WriteLine("TimerCallback: state = {0} at {1}", state, DateTime.Now);
}
Console.ReadKey();
上面的程式碼通過 Change 方法修改了 Timer 的 dueTime 和 period,使得 Timer 會在 2s 後開始執行回撥函數,每隔 3s 執行一次回撥函數。
Timer 的實現主要由以下三個類組成:
TimerQueue 是核心的實現。
public sealed class Timer : MarshalByRefObject, IDisposable, IAsyncDisposable
{
internal TimerHolder _timer;
public Timer(TimerCallback callback,
object? state,
TimeSpan dueTime,
TimeSpan period)
{
long dueTm = (long)dueTime.TotalMilliseconds;
long periodTm = (long)period.TotalMilliseconds;
TimerSetup(callback, state, (uint)dueTm, (uint)periodTm);
}
private void TimerSetup(TimerCallback callback,
object? state,
uint dueTime,
uint period,
bool flowExecutionContext = true)
{
// 實際的任務會被封裝到 TimerQueueTimer 中,並由 TimerHolder 管理其生命週期
_timer = new TimerHolder(new TimerQueueTimer(callback, state, dueTime, period, flowExecutionContext));
}
}
internal sealed class TimerQueueTimer : IThreadPoolWorkItem
{
internal TimerQueueTimer(TimerCallback timerCallback, object? state, uint dueTime, uint period, bool flowExecutionContext)
{
_timerCallback = timerCallback;
_state = state;
_dueTime = Timeout.UnsignedInfinite;
_period = Timeout.UnsignedInfinite;
if (flowExecutionContext)
{
_executionContext = ExecutionContext.Capture();
}
// 分配一個 TimerQueue
_associatedTimerQueue = TimerQueue.Instances[Thread.GetCurrentProcessorId() % TimerQueue.Instances.Length];
if (dueTime != Timeout.UnsignedInfinite)
Change(dueTime, period);
}
internal bool Change(uint dueTime, uint period, bool throwIfDisposed = true)
{
bool success;
lock (_associatedTimerQueue)
{
// ... 省略部分程式碼
// 將 TimerQueueTimer 新增到 TimerQueue 中
success = _associatedTimerQueue.UpdateTimer(this, dueTime, period);
}
return success;
}
void IThreadPoolWorkItem.Execute() => Fire(isThreadPool: true);
internal void Fire(bool isThreadPool = false)
{
// ... 省略部分程式碼
// 呼叫 TimerCallback
_timerCallback(_state);
// ... 省略部分程式碼
}
}
Timer 的任務會被封裝到 TimerQueueTimer 中,並由 TimerHolder 管理其生命週期。
TimerQueue 是實現定時任務的核心,它負責儲存和排程 TimerQueueTimer 範例。
TimerQueueTimer 是 TimerQueue 的基本任務單元,它封裝了待執行的任務。TimerQueueTimer 實現了 IThreadPoolWorkItem 介面,可以交給執行緒池排程執行。
internal sealed class TimerQueueTimer : IThreadPoolWorkItem
{
// 繫結的 TimerQueue
private readonly TimerQueue _associatedTimerQueue;
// TimerQueueTimer 被儲存在 TimerQueue 中,_next 和 _prev 用於構成 TimerQueue 的雙向連結串列
internal TimerQueueTimer? _next;
internal TimerQueueTimer? _prev;
// 是否是儲存在 TimerQueue 的 shortTimers 連結串列中
internal bool _short;
// 此次任務的開始時間
internal long _startTicks;
internal uint _dueTime;
internal uint _period;
// 回撥函數
private readonly TimerCallback _timerCallback;
// 回撥函數的引數
private readonly object? _state;
// 繫結的 ExecutionContext
private readonly ExecutionContext? _executionContext;
// ... 省略
TimerQueueTimer 按照建立目的不同,可以分為兩類:
操作超時timer:這些timer被頻繁建立和銷燬,但幾乎很少觸發。這些timer的目的是僅在發生故障時觸發。它們作為一種故障安全機制,允許系統檢測和處理異常情況。比如用於檢測某個http介面呼叫是否超時。
後臺定時任務timer:這些timer被設計為在特定間隔或特定時間觸發。與超時timer不同,這些timer是確實會觸發的。
對超時timer而言,更多地是要考慮建立和銷燬的效能。而對於後臺定時任務timer,這類任務通常是長期執行的,觸發上稍微多一些時間開銷,就其總執行時間而言,是可以忽略不計的。
TimerQueue 的設計更多地是考慮建立和銷燬的效能,而不是觸發的效能,也就是更多地針對 TimerQueueTimer 的插入和刪除操作進行優化。
因此 TimerQueue 將 TimerQueueTimer 儲存在雙向連結串列中,插入和刪除都是 O(1) 的時間複雜度。
雖然 TimerQueue 更多地是考入插入和刪除的效能,但是遍歷連結串列的效能也是有被考慮的。
TimerQueue 使用 OS 提供的 native timer 來實現定時任務的排程,當 native timer 觸發時,TimerQueue 會遍歷連結串列,觸發所有到期的 TimerQueueTimer。
為了提高遍歷 Timer連結串列 的效能,TimerQueue 會將 TimerQueueTimer 按照到期時間進行分成了兩組:shortTimers 和 longTimers。
TimerQueue 基於一個基本閾值維護了動態更新的參考時間點,在這個參考時間點之前到期的 TimerQueueTimer 會被分到 shortTimers 中,而在這個時間點之後到期的 TimerQueueTimer 會被分到 longTimers 中。
native timer 觸發時,TimerQueue 會先遍歷 shortTimers,如果遍歷完 shortTimers 後,當前的時間還沒到需要遍歷 longTimers 的時間,那麼 TimerQueue 會繼續等待,直到當前時間到達需要遍歷 longTimers 的時間,然後再遍歷 longTimers。
通過對 TimerQueueTimer 分類避免了每次都需要遍歷整個 TimerQueueTimer 連結串列,下文會更詳細地介紹這個演演算法。
TimerQueue 的初始化是在 TimerQueue.Instances 屬性的 getter 方法中完成的,TimerQueue.Instances 屬性是一個靜態屬性,它會在第一次存取時初始化。
TimerQueue 的數量是根據當前機器的 CPU 核心數來決定的,每個 TimerQueue 對應一個 CPU 核心。
internal sealed class TimerQueue: IThreadPoolWorkItem
{
public static TimerQueue[] Instances { get; } = CreateTimerQueues();
private static TimerQueue[] CreateTimerQueues()
{
var queues = new TimerQueue[Environment.ProcessorCount];
for (int i = 0; i < queues.Length; i++)
{
queues[i] = new TimerQueue(i);
}
return queues;
}
}
internal sealed class TimerQueueTimer: IThreadPoolWorkItem
{
internal TimerQueueTimer(TimerCallback timerCallback, object? state, uint dueTime, uint period, bool flowExecutionContext)
{
_timerCallback = timerCallback;
_state = state;
_dueTime = Timeout.UnsignedInfinite;
_period = Timeout.UnsignedInfinite;
if (flowExecutionContext)
{
_executionContext = ExecutionContext.Capture();
}
// 每個 CPU 核心對應一個 TimerQueue, TimerQueueTimer 根據當前執行緒所在的 CPU 核心來決定加入哪個 TimerQueue
_associatedTimerQueue = TimerQueue.Instances[Thread.GetCurrentProcessorId() % TimerQueue.Instances.Length];
if (dueTime != Timeout.UnsignedInfinite)
Change(dueTime, period);
}
internal bool Change(uint dueTime, uint period, bool throwIfDisposed = true)
{
bool success;
lock (_associatedTimerQueue)
{
if (_canceled)
return throwIfDisposed ? throw new ObjectDisposedException(null, SR.ObjectDisposed_Generic) : false;
_period = period;
// 如果 dueTime == Timeout.UnsignedInfinite,表示 TimerQueueTimer 被取消,需要從 TimerQueue 中移除
if (dueTime == Timeout.UnsignedInfinite)
{
_associatedTimerQueue.DeleteTimer(this);
success = true;
}
else
{
// 表示 TimerQueueTimer 加入 TimerQueue 中或更新 TimerQueueTimer 在 TimerQueue 中的位置及到期時間
success = _associatedTimerQueue.UpdateTimer(this, dueTime, period);
}
}
return success;
}
}
在 TimerQueueTimer 關聯上 TimerQueue 後,接著就是呼叫 TimerQueue 的 UpdateTimer 方法,將 TimerQueueTimer 加入到 TimerQueue 中,或者更新 TimerQueueTimer 在 TimerQueue 中的位置及到期時間。
TimerQueue.UpdateTimer 被呼叫的時機有兩個:
TimerQueue 維護了一個參考時間點 currentAbsoluteThreshold,到期時間在這個時間點之前的 TimerQueueTimer 會被分到 shortTimers 中,而到期時間在這個時間點之後的
TimerQueueTimer 會被分到 longTimers 中。
TimerQueue 定義了一個閾值 ShortTimersThresholdMilliseconds = 333ms,參考時間點 currentAbsoluteThreshold 的初始值就是 TimerQueue 建立時間 +
這個閾值 333ms。每次 shortTimers 被遍歷完一輪後,currentAbsoluteThreshold 會被更新為當前時間點 + 333ms。
為方便理解,下文用參考時間點來代指 currentAbsoluteThreshold。
internal sealed class TimerQueue: IThreadPoolWorkItem
{
// 系統啟動時間作為基準時間,這邊每個平臺的實現略有不同
https://github.com/dotnet/runtime/blob/release/7.0/src/libraries/System.Private.CoreLib/src/System/Threading/TimerQueue.Unix.cs
https://github.com/dotnet/runtime/blob/release/7.0/src/libraries/System.Private.CoreLib/src/System/Threading/TimerQueue.Windows.cs
private static long TickCount64 => Environment.TickCount64;
// 決定是否將 TimerQueueTimer 新增到 shortTimers 中的閾值,333ms
private const int ShortTimersThresholdMilliseconds = 333;
// 參考時間點,到期時間在這個時間點之前的 TimerQueueTimer 會被分到 shortTimers 中,而到期時間在這個時間點之後的 TimerQueueTimer 會被分到 longTimers 中
private long _currentAbsoluteThreshold = TickCount64 + ShortTimersThresholdMilliseconds;
// shortTimers 和 longTimers 都是雙向連結串列
private TimerQueueTimer? _shortTimers;
private TimerQueueTimer? _longTimers;
public bool UpdateTimer(TimerQueueTimer timer, uint dueTime, uint period)
{
long nowTicks = TickCount64;
// dueTime 表示 TimerQueueTimer 離到期的時間間隔
// 比較 TimerQueueTimer 的到期時間和 _currentAbsoluteThreshold,決定是否將 TimerQueueTimer 新增到 shortTimers 中
long absoluteDueTime = nowTicks + dueTime;
bool shouldBeShort = _currentAbsoluteThreshold - absoluteDueTime >= 0;
if (timer._dueTime == Timeout.UnsignedInfinite)
{
// _dueTime == Timeout.UnsignedInfinite 表示 timer 未被新增到 TimerQueue 中,只需要直接新增即可
timer._short = shouldBeShort;
LinkTimer(timer);
++ActiveCount;
}
else if (timer._short != shouldBeShort) // _short 表示 timer 是否需要被新增到 shortTimers 中
{
// 如果 timer 已經被新增到 TimerQueue 中,
// 但是 timer 應該被新增到另外一個連結串列中,
// 需要先將 TimerQueueTimer 從原來的連結串列中移除,然後再新增到新的連結串列中
UnlinkTimer(timer);
timer._short = shouldBeShort;
LinkTimer(timer);
}
timer._dueTime = dueTime;
timer._period = (period == 0) ? Timeout.UnsignedInfinite : period;
timer._startTicks = nowTicks;
// 確保 native timer 在 TimerQueueTimer 的到期時間之前觸發
return EnsureTimerFiresBy(dueTime);
}
}
TimerQueue 中的 shortTimers 和 longTimers 都是雙向連結串列,TimerQueueTimer 被新增到 TimerQueue 中時,會使用頭插法將 TimerQueueTimer 新增到 shortTimers 或 longTimers 中。
TimerQueue 對 TimerQueueTimer 的更新操作分為三種:
internal sealed class TimerQueue: IThreadPoolWorkItem
{
private TimerQueueTimer? _shortTimers;
private TimerQueueTimer? _longTimers;
public long ActiveCount { get; private set; }
private void LinkTimer(TimerQueueTimer timer)
{
// _short 表示 timer 是否需要被新增到 shortTimers 中
ref TimerQueueTimer? listHead = ref timer._short ? ref _shortTimers : ref _longTimers;
// 使用頭插法將 timer 新增到 shortTimers 或 longTimers 中
timer._next = listHead;
if (timer._next != null)
{
timer._next._prev = timer;
}
timer._prev = null;
listHead = timer;
}
private void UnlinkTimer(TimerQueueTimer timer)
{
// 如果 timer 不是 shortTimers 或 longTimers 的尾節點,需要更新 timer 後一個節點的 prev 指標
TimerQueueTimer? t = timer._next;
if (t != null)
{
t._prev = timer._prev;
}
// 如果 timer 是 shortTimers 或 longTimers 的頭結點,需要更新 shortTimers 或 longTimers 的頭結點
if (_shortTimers == timer)
{
_shortTimers = t;
}
else if (_longTimers == timer)
{
_longTimers = t;
}
// 如果 timer 不是 shortTimers 或 longTimers 的頭結點,需要更新 timer 前一個節點的 next 指標
t = timer._prev;
if (t != null)
{
t._next = timer._next;
}
}
public void DeleteTimer(TimerQueueTimer timer)
{
if (timer._dueTime != Timeout.UnsignedInfinite)
{
--ActiveCount;
UnlinkTimer(timer);
timer._prev = null;
timer._next = null;
timer._dueTime = Timeout.UnsignedInfinite;
timer._period = Timeout.UnsignedInfinite;
timer._startTicks = 0;
timer._short = false;
}
}
}
遍歷演演算法的關鍵是對 TimerQueue 中的 TimerQueueTimer 的分類處理。
從下面兩個維度對 TimerQueueTimer 進行分類:
TimerQueue 繫結的 native timer 到期後,會呼叫 TimerQueue 的 FireNextTimers 方法,FireNextTimers 方法會先遍歷 shortTimers,如果當前時間大於參考時間點
_currentAbsoluteThreshold,則會繼續遍歷 longTimers。
遍歷 shortTimers 的過程中需要判斷是一次性的 TimerQueueTimer 還是週期性的 TimerQueueTimer。
遍歷 longTimers 的過程中,會有兩種情況:
TimerQueueTimer 的回撥實際在哪觸發分為兩種情況:
下面是 TimerQueue 的 FireNextTimers 的大致流程圖:
internal sealed class TimerQueue: IThreadPoolWorkItem
{
// 將 TimerQueueTimer 從 shortTimers 或 longTimers 中移除,然後新增到另外一個佇列中
public void MoveTimerToCorrectList(TimerQueueTimer timer, bool shortList)
{
UnlinkTimer(timer);
timer._short = shortList;
LinkTimer(timer);
}
private void FireNextTimers()
{
// 第一次個 TimerQueueTimer 在當前執行緒執行,其他會被新增到執行緒池中
TimerQueueTimer? timerToFireOnThisThread = null;
lock (this)
{
_isTimerScheduled = false;
bool haveTimerToSchedule = false;
uint nextTimerDuration = uint.MaxValue;
long nowTicks = TickCount64;
// 分兩次遍歷,第一次遍歷 shortTimers,遍歷完 shortTimers 後,
// 如果當前時間大於參考時間點 _currentAbsoluteThreshold,則再遍歷 longTimers
TimerQueueTimer? timer = _shortTimers;
for (int listNum = 0; listNum < 2; listNum++) // short == 0, long == 1
{
while (timer != null)
{
TimerQueueTimer? next = timer._next;
long elapsed = nowTicks - timer._startTicks;
long remaining = timer._dueTime - elapsed;
if (remaining <= 0)
{
// 將當前 timer 標記為待觸發狀態
timer._everQueued = true;
if (timer._period != Timeout.UnsignedInfinite)
{
// 如果是週期性的 TimerQueueTimer,需要將 timer 重新新增到 shortTimers 或 longTimers 中
// 如果 週期 period 設定的過小,程式碼執行到這裡的時候,下一次觸發的時間已經過去了,就設定下一次觸發的時間為 1ms
timer._startTicks = nowTicks;
long elapsedForNextDueTime = elapsed - timer._dueTime;
timer._dueTime = (elapsedForNextDueTime < timer._period) ?
timer._period - (uint)elapsedForNextDueTime :
1;
// 在遍歷 shortTimers 的過程中,選出最小的 nextTimerDuration,用於後面重新系結 native timer
if (timer._dueTime < nextTimerDuration)
{
haveTimerToSchedule = true;
nextTimerDuration = timer._dueTime;
}
// 週期性的 TimerQueueTimer 會因為下次到期時間的變化而在 shortTimers 和 longTimers 之間移動
bool targetShortList = (nowTicks + timer._dueTime) - _currentAbsoluteThreshold <= 0;
if (timer._short != targetShortList)
{
MoveTimerToCorrectList(timer, targetShortList);
}
}
else
{
// 如果不是週期性的 TimerQueueTimer,直接將 timer 從 TimerQueue 中移除
DeleteTimer(timer);
}
// 第一個 TimerQueueTimer 在當前執行緒執行,其他會被新增到執行緒池中
if (timerToFireOnThisThread == null)
{
timerToFireOnThisThread = timer;
}
else
{
ThreadPool.UnsafeQueueUserWorkItemInternal(timer, preferLocal: false);
}
}
else
{
if (remaining < nextTimerDuration)
{
haveTimerToSchedule = true;
nextTimerDuration = (uint)remaining;
}
// !timer._short 表示目前遍歷的是 longTimers,如果 remaining <= ShortTimersThresholdMilliseconds,
if (!timer._short && remaining <= ShortTimersThresholdMilliseconds)
{
MoveTimerToCorrectList(timer, shortList: true);
}
}
timer = next;
}
// shortTimers 已經遍歷完,判斷是否需要遍歷 longTimers
if (listNum == 0)
{
// 計算當前時間和參考時間點 _currentAbsoluteThreshold 之間的時間差 remaining
// 如果 remaining > 0,說明還不需要遍歷 longTimers
long remaining = _currentAbsoluteThreshold - nowTicks;
if (remaining > 0)
{
if (_shortTimers == null && _longTimers != null)
// 因為沒有 shortTimers 了,下次遍歷 longTimers 時,需要重新計算 nextTimerDuration
// 因為沒辦法確定準確的 nextTimerDuration,所以這裡直接設定為 remaining
// +1 的延遲是為了能在下次觸發時, 是為了順帶處理掉 [_currentAbsoluteThreshold, _currentAbsoluteThreshold + 1] 之間的 TimerQueueTimer
// 避免這些 TimerQueueTimer 延後到下次觸發時才被處理
之間的 TimerQueueTimer
nextTimerDuration = (uint)remaining + 1;
haveTimerToSchedule = true;
}
break;
}
// 切換到 longTimers
timer = _longTimers;
// 更新參考時間點 _currentAbsoluteThreshold
_currentAbsoluteThreshold = nowTicks + ShortTimersThresholdMilliseconds;
}
}
// 重新系結native timer
if (haveTimerToSchedule)
{
EnsureTimerFiresBy(nextTimerDuration);
}
}
// 此次遍歷的第一個 TimerQueueTimer 在執行 FireNativeTimer 的執行緒上執行,減少執行緒切換
// 其他 TimerQueueTimer 在 ThreadPool 執行緒上執行
timerToFireOnThisThread?.Fire();
}
}
Task.Delay 和 用於超時取消的 CancellationTokenSource 也都是基於 TimerQueue 實現的。
Task.Delay 返回的 Task 是對 TimerQueueTimer 的封裝,當 TimerQueueTimer 觸發時,Task.Delay 返回的 Task 會被設定為完成狀態。
超時取消的 CancellationTokenSource 也是對 TimerQueueTimer 的封裝,當 TimerQueueTimer 觸發時,CancellationTokenSource 會被取消。
歡迎關注個人技術公眾號