產品中需要向不同的客戶推播資料,原來的實現是每條資料產生後就立即向客戶推播資料,走的的是HTTP協定。因為每條資料都比較小,而資料生成的頻次也比較高,這就會頻繁的建立HTTP連線,而且每次HTTP傳輸中攜帶的業務資料都很小,對網路的實際利用率不高。希望能夠提高網路的利用率,並降低系統的負載。
一個很自然的想法就是將多條資料一起傳送,這裡有幾個關鍵點:
1、多條資料的聚合邏輯: 是攢夠幾條傳送,還是按照時間週期傳送。如果是攢夠幾條傳送,在資料比較稀疏或者產生頻率不那麼穩定的時候,攢夠需要的資料條數可能比較困難,這時候還得需要一個過期時間,因為客戶可能接受不了太多的延遲。既然不管怎樣都需要使用時間進行控制,我這裡索性就選擇按照時間週期傳送了。思路是:自上次傳送時間起,經過了某個時長之後,就傳送客戶在這段時間內產生的所有資料。
2、資料到期判斷方法:既然選擇了按照時間週期傳送,那麼就必須有辦法判斷是否到了傳送時間。一個很簡單的想法就是輪詢,把所有客戶輪詢一遍,看看誰的資料到期了,就傳送誰的。這個演演算法的時間複雜度是O(N),如果客戶比較多,就會消耗過多的時間在這上邊。還有一個辦法:如果客戶按照時間排序好了,那麼只需要取時間最早的客戶的資料時間判斷就好了,滿足就傳送,一直向後找,直到獲取的客戶資料時間不符合條件,則退出處理,然後等一會再進行判斷處理。這就需要有一個支援排序的資料結構,寫入資料時自動排序,這種資料結構的時間複雜度一般可以做到O(log(n))。對於這個資料結構的讀寫操作原理上就是佇列的操作方式,只不過是個可排序的佇列。
3、區分客戶:不同客戶的資料接收地址不同,向具體某個客戶傳送資料時,應該能比較方便的聚合他的資料,最好是直接就能拿到需要傳送的資料。可以使用字典資料結構來滿足這個需求,取某個客戶資料的時間複雜度可以降低到O(1)。
4、資料的安全性問題:如果程式在資料傳送成功之前退出了,未傳送的資料怎麼辦?是還能繼續傳送,還是就丟掉不管了。如果要在程式重啟後恢復未傳送成功的資料,則必須將資料同步到別的地方,比如持久化到磁碟。因為我這裡的資料安全性要求不高,丟失一些資料也是允許的,所以要傳送的資料收到之後放到記憶體就行了。
上文提到可排序的資料結構,可以使用SortedList<TKey,TValue>,鍵是時間,值是這個時間產生了資料的客戶標識列表。不過它的讀寫操作不是執行緒安全的,需要自己做同步,這裡簡單點就使用lock了。
對於不同客戶的資料,為了方便獲取,使用Dictionary<TKey,TValue>來滿足,鍵是客戶的標識,值是累積的未傳送客戶資料。這個資料讀寫也不是執行緒安全的,可以和SortedList的讀寫放到同一個lock中。
下邊是它們的定義:
SortedList<DateTime, List<TKey>> _queue = new SortedList<DateTime, List<TKey>>();
Dictionary<TKey, List<TValue>> _data = new Dictionary<TKey, List<TValue>>();
readonly object _lock = new object();
插入資料的時候,需要先寫入SortedList,然後再寫入Dictionary。程式碼邏輯比較簡單,請看:
public void Publish(TKey key, TValue value)
{
DateTime now = DateTime.Now;
lock (_lock)
{
if (_queue.TryGetValue(now, out List<TKey>? keys))
{
if (!keys!.Contains(key))
{
keys.Add(key);
}
}
else
{
_queue.Add(now, new List<TKey> { key });
}
if (_data.TryGetValue(key, out List<TValue>? values))
{
values.Add(value);
}
else
{
_data.Add(key, new List<TValue> { value });
}
}
}
對於消費資料,這裡採用拉資料的模式。最開始寫的方法邏輯是:讀取一條資料,處理它,然後從佇列中刪除。但是這個邏輯需要對佇列進行讀寫,所以必須加鎖。一般處理資料比較耗時,比如這裡要通過HTTP傳送資料,加鎖的話就可能導致寫資料到佇列時阻塞的時間比較長。所以這裡實現的是把可以傳送的資料全部提取出來,然後就釋放鎖,資料的處理放到鎖的外部實現,這樣佇列的讀寫效能就比較好了。
public List<(TKey key, List<TValue> value)> Pull(int maxNumberOfMessages)
{
List<(TKey, List<TValue>)> result = new List<(TKey, List<TValue>)>();
DateTime now = DateTime.Now;
lock (_lock)
{
int messageCount = 0;
while (true)
{
if (!_queue.Any())
{
break;
}
var first = _queue.First();
var diffMillseconds = now.Subtract(first.Key).TotalMilliseconds;
if (diffMillseconds < _valueDequeueMillseconds)
{
break;
}
var keys = first.Value;
foreach (var key in keys)
{
if (_data.TryGetValue(key, out List<TValue>? keyValues))
{
result.Add((key, keyValues));
_data.Remove(key);
messageCount += keyValues!.Count;
}
}
_queue.RemoveAt(0);
if (messageCount >= maxNumberOfMessages)
{
break;
}
}
}
return result;
}
這段程式碼比較長一些,我梳理下邏輯:取佇列的第一條資料,判斷時間是否達到傳送週期,未達到則直接退出,方法返回空列表。如果達到傳送週期,則取出第一條資料中儲存的客戶標識,然後根據這些標識獲取對應的客戶未傳送資料,將這些資料按照客戶維度新增到返回列表中,將這些客戶及其資料從佇列中移除,返回有資料的列表。這裡還增加了一個拉取資料的條數限制,方便根據業務實際情況進行控制。
再來看一下怎麼使用這個佇列,這裡模擬多個生產者加一個消費者,其實可以任意多個生產者和消費者:
TimeSortedQueue<string, string> queue = new TimeSortedQueue<string, string>(3000);
List<Task> publishTasks = new List<Task>();
for (int i = 0; i < 4; i++)
{
var j = i;
publishTasks.Add(Task.Factory.StartNew(() =>
{
int k = 0;
while (true)
{
queue.Publish($"key_{k}", $"value_{j}_{k}");
Thread.Sleep(15);
k++;
}
}, TaskCreationOptions.LongRunning));
}
Task.Factory.StartNew(() =>
{
while (true)
{
var list = queue.Pull(100);
if (list.Count <= 0)
{
Thread.Sleep(100);
continue;
}
foreach (var item in list)
{
Console.WriteLine($"{DateTime.Now.ToString("mmss.fff")}:{item.key}, {string.Join(",", item.value)}");
}
}
}, TaskCreationOptions.LongRunning);
Task.WaitAll(publishTasks.ToArray());
以上就是針對這個特定需求實現的一個按照時間進行排序的佇列。
萬物皆可排序的佇列
我們很容易想到,既然可以按照時間排序,那麼按照別的資料型別排序也是可以的。這個資料結構可以應用的場景很多,比如按照權重排序的佇列、按照優先順序排序的佇列、按照年齡排序的佇列、按照銀行存款排序的佇列,等等。這就是一個萬物皆可排序的佇列。
我這裡把主要程式碼貼出來(完整程式碼和範例請看文末):
public class SortedQueue<TSortKey, TKey, TValue>
where TSortKey : notnull, IComparable
where TKey : notnull
where TValue : notnull
{
Dictionary<TKey, List<TValue>> _data = new Dictionary<TKey, List<TValue>>();
SortedList<TSortKey, List<TKey>> _queue = new SortedList<TSortKey, List<TKey>>();
readonly object _lock = new object();
/// <summary>
/// Create a new instance of SortedQueue
/// </summary>
public SortedQueue(int maxNumberOfMessageConsumedOnce)
{
}
/// <summary>
/// Publish a message to queue
/// </summary>
/// <param name="sortKey">The key in the queue for sorting. Different messages can use the same key.</param>
/// <param name="key">The message key.</param>
/// <param name="value">The message value.</param>
public void Publish(TSortKey sortKey, TKey key, TValue value)
{
lock (_lock)
{
if (_queue.TryGetValue(sortKey, out List<TKey>? keys))
{
keys.Add(key);
}
else
{
_queue.Add(sortKey, new List<TKey> { key });
}
if (_data.TryGetValue(key, out List<TValue>? values))
{
values.Add(value);
}
else
{
_data.Add(key, new List<TValue> { value });
}
}
}
/// <summary>
/// Pull a batch of messages.
/// </summary>
/// <param name="maxNumberOfMessages">The maximum number of pull messages.</param>
/// <returns></returns>
public List<(TKey Key, List<TValue> Value)> Pull(int maxNumberOfMessages)
{
List<(TKey, List<TValue>)> result = new List<(TKey, List<TValue>)>();
lock (_lock)
{
int messageCount = 0;
while (true)
{
if (!_queue.Any())
{
break;
}
var keys = _queue.First().Value;
foreach (var key in keys)
{
if (_data.TryGetValue(key, out List<TValue>? keyValues))
{
result.Add((key, keyValues));
_data.Remove(key);
messageCount += keyValues!.Count;
}
}
_queue.RemoveAt(0);
if (messageCount >= maxNumberOfMessages)
{
break;
}
}
}
return result;
}
}
程式碼邏輯還是比較簡單的,就不羅嗦了,如有問題歡迎留言交流。
因為在這個實現中所有待處理的資料都在記憶體中,丟失資料會帶來一定的風險,因為我這個程式前邊還有一個佇列,即使程式崩潰了,也只損失沒處理的一小部分資料,業務上可以接受,所以這樣做沒有問題。如果你對這個程式感興趣,需要慎重考慮你的應用場景。
來看看資料丟失可能發生的兩種情況:
一是資料還在佇列中時程式重啟了:對於這種情況,前文提到將資料同步到其它地方,比如寫入Redis、寫入資料庫、寫入磁碟等等。不過因為網路IO、磁碟IO較慢,這往往會帶來吞吐量的大幅下降,想要保證一定的吞吐量,還得引入一些分片機制,又因為分散式的不可靠,可能還得增加一些容錯容災機制,比較複雜,可以參考Kafka。
二是資料處理的時候失敗了:對於這種情況,可以讓程式重試;但是如果異常導致程式崩潰了,資料已經從記憶體或者其它儲存中移除了,資料還是會發生丟失。這時候可以採用一個ACK機制,處理成功後向佇列傳送一個ACK,攜帶已經處理的資料標識,佇列根據標識刪除資料。否則消費者還能消費到這些資料。
這些問題並不一定要完全解決,還是得看業務場景,有可能你把資料持久化到Redis就夠了,或者你也不用引入ACK機制,記錄下處理到哪一條了就行了。
以上就是本文的主要內容了,完整程式碼和範例請存取Github:https://github.com/bosima/dotnet-demo/tree/main/CSharp-SortedList