我們經常會遇到這樣的資料處理應用場景:我們利用一個元件實時收集外部交付給它的資料,並由它轉發給一個外部處理程式進行處理。考慮到效能,它會將資料儲存在本地緩衝區,等累積到指定的數量後打包傳送;考慮到實時性,資料不能在緩衝區存太長的時間,必須設定一個延時時間,一旦超過這個時間,緩衝的資料必須立即發出去。看似簡單的需求,如果需要綜合考慮效能、執行緒安全、記憶體分配,要實現起來還真有點麻煩。這個問題有不同的解法,本文提供一種實現方案。
一、範例演示
二、待處理的批次資料:Batch<T>
三、感知資料處理的時機:BatchChangeToken
四、接收、緩衝、打包和處理資料:Batcher<T>
我們先來看看最終達成的效果。在如下這段程式碼中,我們使用一個Batcher<string>物件來接收應用分發給它的資料,該物件最終會在適當的時機處理它們。 呼叫Batcher<string>建構函式的三個引數分別表示:
var batcher = new Batcher<string>( processor:Process, batchSize:10, interval: TimeSpan.FromSeconds(5)); var random = new Random(); while (true) { var count = random.Next(1, 4); for (var i = 0; i < count; i++) { batcher.Add(Guid.NewGuid().ToString()); } await Task.Delay(1000); } static void Process(Batch<string> batch)=> Console.WriteLine($"[{DateTimeOffset.Now}]{batch.Count} items are delivered.");
如上面的程式碼片段所示,在一個迴圈中,我們每隔1秒鐘隨機新增1-3個資料項。從下圖中可以看出,Process方法的呼叫具有兩種觸發條件,一是累積的資料量達到設定的閾值10,另一個則是當前時間與上一次處理時間間隔超過5秒。
除了上面範例涉及的Batcher<T>,該解決方案還涉及兩個額外的型別,如下這個Batch<T>型別表示最終傳送的批次資料。為了避免緩衝資料帶來的記憶體分配,我們使用了一個單獨的ArrayPool<T>物件來建立池化的陣列,這個功能體現在靜態方法CreatePooledArray方法上。由於構建Batch<T>物件提供的陣列來源於物件池,在處理完畢後必須迴歸物件池,所以我們讓這個型別實現了IDisposable介面,並將這一操作實現在Dispose方法種。在呼叫ArrayPool<T>物件的Return方法時,我們特意將陣列清空。由於提供的陣列來源於物件池,所以並不能保證每個資料元素都承載了有效的資料,實現的迭代器和返回數量的Count屬性對此作了相應的處理。
public sealed class Batch<T> : IEnumerable<T>, IDisposable where T : class { private bool _isDisposed; private int? _count; private readonly T[] _data; private static readonly ArrayPool<T> _pool = ArrayPool<T>.Create(); public int Count { get { if (_isDisposed) throw new ObjectDisposedException(nameof(Batch<T>)); if(_count.HasValue) return _count.Value; var count = 0; for (int index = 0; index < _data.Length; index++) { if (_data[index] is null) { break; } count++; } return (_count = count).Value; } } public Batch(T[] data) => _data = data ?? throw new ArgumentNullException(nameof(data)); public void Dispose() { _pool.Return(_data, clearArray: true); _isDisposed = true; } public IEnumerator<T> GetEnumerator() => new Enumerator(this); IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); public static T[] CreatePooledArray(int batchSize) => _pool.Rent(batchSize); private void EnsureNotDisposed() { if (_isDisposed) throw new ObjectDisposedException(nameof(Batch<T>)); } private sealed class Enumerator : IEnumerator<T> { private readonly Batch<T> _batch; private readonly T[] _data; private int _index = -1; public Enumerator(Batch<T> batch) { _batch = batch; _data = batch._data; } public T Current { get { _batch.EnsureNotDisposed(); return _data[_index]; } } object IEnumerator.Current => Current; public void Dispose() { } public bool MoveNext() { _batch.EnsureNotDisposed(); return ++_index < _data.Length && _data[_index] is not null; } public void Reset() { _batch.EnsureNotDisposed(); _index = -1; } } }
Batcher具有兩個觸發資料處理的設定:緩衝的資料量和兩次資料處理之間的最長間隔。當累積的資料量或者當前時間與上一次處理的間隔達到閾值,緩衝的資料將自動被處理。.NET Core經常利用一個IChangeToken作為通知的令牌,為此我們定義瞭如下這個實現了該介面的BatchChangeToken型別。如下面的程式碼片段所示,上述兩個觸發條件體現在兩個CancellationToken物件上,我們利用它們建立了對應的CancellationChangeToken物件,最後利用這兩個CancellationChangeToken建立了一個CompositeChangeToken物件。這個CompositeChangeToken物件最終被用來實現了IChangeToken介面的三個成員。
internal sealed class BatchChangeToken : IChangeToken { private readonly IChangeToken _innerToken; private readonly int _countThreshold; private readonly CancellationTokenSource _expirationTokenSource; private readonly CancellationTokenSource _countTokenSource; private int _counter; public BatchChangeToken(int countThreshold, TimeSpan timeThreshold) { _countThreshold = countThreshold; _countTokenSource = new CancellationTokenSource(); _expirationTokenSource = new CancellationTokenSource(timeThreshold); var countToken = new CancellationChangeToken(_countTokenSource.Token); var expirationToken = new CancellationChangeToken(_expirationTokenSource.Token); _innerToken = new CompositeChangeToken(new IChangeToken[] { countToken, expirationToken }); } public bool HasChanged => _innerToken.HasChanged; public bool ActiveChangeCallbacks => _innerToken.ActiveChangeCallbacks; public IDisposable RegisterChangeCallback(Action<object?> callback, object? state) => _innerToken.RegisterChangeCallback(s => { callback(s); _countTokenSource.Dispose(); _expirationTokenSource.Dispose(); }, state); public void Increase() { Interlocked.Increment(ref _counter); if (_counter >= _countThreshold) { _countTokenSource.Cancel(); } } }
上述兩個CancellationToken來源於對應的CancellationTokenSource,對應的欄位為_countTokenSource和_expirationTokenSource。_expirationTokenSource根據設定的資料處理時間間隔建立而成。為了確定緩衝的資料量,我們提供了一個計數器,並利用Increase方法進行計數。在超過設定的資料量時,該方法會呼叫_expirationTokenSource的Cancel方法。在實現的ActiveChangeCallbacks方法種,我們將針對這兩個CancellationTokenSource的釋放放在註冊的回撥中。
最終用於打包的Batcher型別定義如下。在建構函式中,我們除了提供上述兩個閾值外,還提供了一個Action<Batch<T>>委託完成針對打包資料的處理。通過Add方法接收的資料儲存在_data欄位返回的陣列上,它時通過Batch<T>的靜態方法CreatePooledArray提供的。我們使用欄位_index表示新增資料在_data陣列中儲存的位置,並使用InterLocked.Increase方法解決並行問題。
public sealed class Batcher<T> : IDisposable where T : class { private readonly Action<Batch<T>> _processor; private T[] _data; private BatchChangeToken _changeToken = default!; private readonly int _batchSize; private int _index = -1; private readonly IDisposable _scheduler; public Batcher(Action<Batch<T>> processor, int batchSize, TimeSpan interval) { _processor = processor ?? throw new ArgumentNullException(nameof(processor)); _batchSize = batchSize; _data = Batch<T>.CreatePooledArray(batchSize); _scheduler = ChangeToken.OnChange(() => _changeToken = new BatchChangeToken(_batchSize, interval), OnChange); void OnChange() { var data = Interlocked.Exchange(ref _data, Batch<T>.CreatePooledArray(batchSize)); if (data[0] is not null) { Interlocked.Exchange(ref _index, -1); _ = Task.Run(() => _processor.Invoke(new Batch<T>(data))); } } } public void Add(T item) { if (item is null) throw new ArgumentNullException(nameof(item)); var index = Interlocked.Increment(ref _index); if (index >= _batchSize) { SpinWait.SpinUntil(() => _index < _batchSize - 1); Add(item); } _data[index] = item; _changeToken.Increase(); } public void Dispose() => _scheduler.Dispose(); }
在建構函式中,我們呼叫了ChangeToken的靜態方法OnChange將資料處理操作繫結到建立的BatchChangeToken物件上,並確保每次傳送「資料處理」後將重新建立的BatchChangeToken物件賦值到_changeToken欄位上,因為Add放到需要呼叫它的Increase增加計數。當接收到資料處理通知後,我們會呼叫Batch<T>的靜態方法CreatePooledArray構建一個陣列將欄位 _data參照的陣列替換下來,並將其封裝成Batch<T>物件進行處理(如果資料存在)。於此同時,表示新增資料儲存索引的_index恢復成-1。Add方法在對_index做自增操作後,如果發現累積的資料量達到閾值,需要等待資料處理完畢。由於資料處理以非同步的方式處理,這裡的耗時時很低的,所以我們這裡選擇了自旋的方式等待它完成。