該原文是Ayende Rahien大佬業餘自己在使用C# 和 .NET構建一個簡單、高效能相容Redis協定的資料庫的經歷。
首先這個"Redis"是非常簡單的實現,但是他在優化這個簡單"Redis"路程很有趣,也能給我們在從事效能優化工作時帶來一些啟示。
原作者:Ayende Rahien
原連結:https://ayende.com/blog/197473-C/high-performance-net-building-a-redis-clone-architecture
在之前的文章中,我們嘗試用最簡單的方式來完成一個Redis克隆版。開啟一個通訊端來監聽,為每個使用者端單獨分配一個Task來從網路讀取資料,解析命名並執行它。雖然在流水線上有一些小的改進,但也只僅此而已。
讓我們退一步來構建一個與Redis架構更為接近的Redis克隆版。為此,我們需要在一個執行緒中完成所有工作。這在C#中是比較難實現的,沒有用於執行Redis那樣工作型別的API。更確切的來說是有Socket.Select()
方法,但是需要我們自己在此基礎上構建一切(比如我們必須寫程式碼處理緩衝、字串等等)。
考慮到這是通往最終建議的架構的一箇中途站,我決定完全跳過這個。相反,我將首先專注於消除系統中的主要瓶頸,即ConcurrentDictionary
。
分析器的結果表明,我們這最大的開銷就是ConcurrentDictionary
的可伸縮性。即使我使用了1024個分片的鎖,它仍然佔用50%的時間開銷。問題是,我們能做得更好嗎?我們可以嘗試一個更好的選擇,就是我們不再使用ConcurrentDictionary
,而是直接使用單獨的Dictionary
來分片,這樣的話每個Dictionary
都不需要並行就可以存取。
我的想法是這樣的,我們將為使用者端提供常規的讀寫操作。但是,我們不會直接在I/O上處理這些命令,而是將其路由到一個專用的執行緒(使用它自己的Dictionary
)來完成這項工作。因為我是16核的機器,我將建立10個這樣的執行緒(假設它們每個都能分配到1個核心),並且我能夠將I/O處理放到其餘的6個核心上。
以下是更改後的結果:
請注意,我們現在跑分的資料是125w/s,比上一次幾乎增長了25%。
下面是這一次新程式碼的分析器結果:
因此在本例中,花費了大量的時間來處理各種各樣的字串,等待GC(大約佔30%)。集合的成本下降了很多。
還有一些其它的開銷出現在我眼前,看看這裡:
對於「簡單」屬性查詢來說,這個開銷非常驚人。另外SubString
函數的呼叫開銷也很大,超過整個系統開銷的6%。
在研究系統其它部分時,看到了這個:
這真的很有趣,因為我們花了很多的時間在等待佇列中是否有新的元素,其實我們可以做更多的事情,而不是就在那乾等著。
我還嘗試了其它的執行緒數量,如果只執行一個ExecWorker,我們的執行速度是40w/s,兩個執行緒,我們的執行速度是70w/s。當使用4個專用於處理請求的執行緒時,我們的執行速度是106w/s。
因此,很明顯,我們需要重新考慮這種方案,我們不能夠正確地擴充套件到合適的數值。
注意,這種方法也不利用流水線。我們分別處理每個命令和其他命令。我的下一步是新增對使用這種方法的流水線的支援,並測量這種影響。
從另一方面來說,我們現在的效能還是100w/s,考慮到我只花了很少的時間來實現方案,從這個方案可以獲得25w/s的效能提升,這是令人激動人心的。從側面說,我們還有更多的事情可以做,但我想把重點放在修復我們第一個方案上。
下面是當前的狀態,因此您可以與原始程式碼比較。
using System.Collections.Concurrent;
using System.Net.Sockets;
using System.Threading.Channels;
var listener = new TcpListener(System.Net.IPAddress.Any, 6379);
listener.Start();
var redisClone = new RedisClone();
while (true)
{
var client = listener.AcceptTcpClient();
var _ = redisClone.HandleConnection(client); // run async
}
public class RedisClone
{
ShardedDictionary _state = new(Environment.ProcessorCount / 2);
public async Task HandleConnection(TcpClient tcp)
{
var _ = tcp;
var stream = tcp.GetStream();
var client = new Client
{
Tcp = tcp,
Dic = _state,
Reader = new StreamReader(stream),
Writer = new StreamWriter(stream)
{
NewLine = "\r\n"
}
};
await client.ReadAsync();
}
}
class Client
{
public TcpClient Tcp;
public StreamReader Reader;
public StreamWriter Writer;
public string Key;
public string? Value;
public ShardedDictionary Dic;
List<string> Args = new();
public async Task ReadAsync()
{
try
{
Args.Clear();
var lineTask = Reader.ReadLineAsync();
if (lineTask.IsCompleted == false)
{
await Writer.FlushAsync();
}
var line = await lineTask;
if (line == null)
{
using (Tcp)
{
return;
}
}
if (line[0] != '*')
throw new InvalidDataException("Cannot understand arg batch: " + line);
var argsv = int.Parse(line.Substring(1));
for (int i = 0; i < argsv; i++)
{
line = await Reader.ReadLineAsync();
if (line == null || line[0] != '$')
throw new InvalidDataException("Cannot understand arg length: " + line);
var argLen = int.Parse(line.Substring(1));
line = await Reader.ReadLineAsync();
if (line == null || line.Length != argLen)
throw new InvalidDataException("Wrong arg length expected " + argLen + " got: " + line);
Args.Add(line);
}
switch (Args[0])
{
case "GET":
Key = Args[1];
Value = null;
break;
case "SET":
Key = Args[1];
Value = Args[2];
break;
default:
throw new ArgumentOutOfRangeException("Unknown command: " + Args[0]);
}
Dic.Run(this);
}
catch (Exception e)
{
await HandleError(e);
}
}
public async Task NextAsync()
{
try
{
if (Value == null)
{
await Writer.WriteLineAsync("$-1");
}
else
{
await Writer.WriteLineAsync($"${Value.Length}\r\n{Value}");
}
await ReadAsync();
}
catch (Exception e)
{
await HandleError(e);
}
}
public async Task HandleError(Exception e)
{
using (Tcp)
{
try
{
string? line;
var errReader = new StringReader(e.ToString());
while ((line = errReader.ReadLine()) != null)
{
await Writer.WriteAsync("-");
await Writer.WriteLineAsync(line);
}
await Writer.FlushAsync();
}
catch (Exception)
{
// nothing we can do
}
}
}
}
class ShardedDictionary
{
Dictionary<string, string>[] _dics;
BlockingCollection<Client>[] _workers;
public ShardedDictionary(int shardingFactor)
{
_dics = new Dictionary<string, string>[shardingFactor];
_workers = new BlockingCollection<Client>[shardingFactor];
for (int i = 0; i < shardingFactor; i++)
{
var dic = new Dictionary<string, string>();
var worker = new BlockingCollection<Client>();
_dics[i] = dic;
_workers[i] = worker;
// readers
new Thread(() =>
{
ExecWorker(dic, worker);
})
{
IsBackground = true,
}.Start();
}
}
private static void ExecWorker(Dictionary<string, string> dic, BlockingCollection<Client> worker)
{
while (true)
{
var client = worker.Take();
if (client.Value != null)
{
dic[client.Key] = client.Value;
client.Value = null;
}
else
{
dic.TryGetValue(client.Key, out client.Value);
}
var _ = client.NextAsync();
}
}
public void Run(Client c)
{
var reader = _workers[c.GetHashCode() % _workers.Length];
reader.Add(c);
}
}
之前一直有朋友讓開通公眾號,由於一直比較忙沒有弄。
現在終於抽空弄好了,譯者公眾號如下,歡迎大家關注。
使用.NET簡單實現一個Redis的高效能克隆版(一)
使用.NET簡單實現一個Redis的高效能克隆版(二)