該原文是Ayende Rahien大佬業餘自己在使用C# 和 .NET構建一個簡單、高效能相容Redis協定的資料庫的經歷。
首先這個"Redis"是非常簡單的實現,但是他在優化這個簡單"Redis"路程很有趣,也能給我們在從事效能優化工作時帶來一些啟示。
由於接下來的兩篇較短,本文一起把它們一起翻譯
原作者:Ayende Rahien
原連結:
https://ayende.com/blog/197505-C/high-performance-net-building-a-redis-clone-separation-of-computation-i-o
另外Ayende大佬是.NET開源的高效能多正規化資料庫RavenDB所在公司的CTO,不排除這些文章是為了以後會在RavenDB上相容Redis協定做的嘗試。大家也可以多多支援,下方給出了連結
RavenDB地址:https://github.com/ravendb/ravendb
在達到125w/s的效能以後,我決定試試把程式碼修改成流水線(pipeline)會發生什麼。這個改動很複雜,因為我要追蹤所有的輸入請求,又需要將輸入請求傳送到對應的的多個執行緒進行處理。
在我看來,這些程式碼本身就是垃圾。但是隻要它能在架構上為我指明正確的方向,那麼就是值得的。您可以再下面閱讀那些程式碼,但是它有點複雜,我們儘可能的多讀取使用者端請求,然後將其傳送到每個專用執行緒來執行它。
就效能而言,它比上一個版本的程式碼慢(大約20%),但是它有一個好處,那就是能很容易的看出哪裡的花費的資源最多。
看看下面的分析器結果:
您可以看到,我們在 I/O 和字串處理方面花費了很多時間。GC也花費了很多時間。
我想分階段解決這個問題。第一部分是停止到處使用字串。之後的下一個階段可能是更改 I/O 模型。
就目前而言,我們的程式碼是這樣的:
using System.Collections.Concurrent;
using System.Net.Sockets;
using System.Text;
using System.Threading.Channels;
var listener = new TcpListener(System.Net.IPAddress.Any, 6379);
listener.Start();
ShardedDictionary _state = new(Environment.ProcessorCount / 2);
while (true)
{
var tcp = listener.AcceptTcpClient();
var stream = tcp.GetStream();
var client = new Client(tcp, new StreamReader(stream), new StreamWriter(stream)
{
AutoFlush = true
}, _state);
var _ = client.ReadAsync();
}
class Client
{
public readonly TcpClient Tcp;
public readonly StreamReader Reader;
public readonly StreamWriter Writer;
public readonly ShardedDictionary Dic;
public struct Command
{
public string Key;
public string? Value;
public bool Completed;
}
private List<string> _args = new();
private Task<string?> _nextLine;
private Command[] _commands = Array.Empty<Command>();
private int _commandsLength = 0;
private StringBuilder _buffer = new();
private int _shardFactor;
public Client(TcpClient tcp, StreamReader reader, StreamWriter writer, ShardedDictionary dic)
{
Tcp = tcp;
Reader = reader;
Writer = writer;
Dic = dic;
_shardFactor = dic.Factor;
}
public async Task ReadAsync()
{
try
{
while (true)
{
if (_buffer.Length != 0)
{
await Writer.WriteAsync(_buffer);
_buffer.Length = 0;
}
var lineTask = _nextLine ?? Reader.ReadLineAsync();
if (lineTask.IsCompleted == false)
{
if (_commandsLength != 0)
{
_nextLine = lineTask;
Dic.Enqueue(this, Math.Abs(_commands[0].Key.GetHashCode()) % _shardFactor);
return;
}
}
var line = await lineTask;
_nextLine = null;
if (line == null)
{
using (Tcp) // done reading...
{
return;
}
}
await ReadCommand(line);
AddCommand();
}
}
catch (Exception e)
{
await HandleError(e);
}
}
private async Task ReadCommand(string line)
{
_args.Clear();
if (line[0] != '*')
throw new InvalidDataException("Cannot understand arg batch: " + line);
var argsv = int.Parse(line.Substring(1));
for (int i = 0; i < argsv; i++)
{
line = await Reader.ReadLineAsync() ?? string.Empty;
if (line[0] != '$')
throw new InvalidDataException("Cannot understand arg length: " + line);
var argLen = int.Parse(line.Substring(1));
line = await Reader.ReadLineAsync() ?? string.Empty;
if (line.Length != argLen)
throw new InvalidDataException("Wrong arg length expected " + argLen + " got: " + line);
_args.Add(line);
}
}
private void AddCommand()
{
if (_commandsLength >= _commands.Length)
{
Array.Resize(ref _commands, _commands.Length + 8);
}
ref Command cmd = ref _commands[_commandsLength++];
cmd.Completed = false;
switch (_args[0])
{
case "GET":
cmd.Key = _args[1];
cmd.Value = null;
break;
case "SET":
cmd.Key = _args[1];
cmd.Value = _args[2];
break;
default:
throw new ArgumentOutOfRangeException("Unknown command: " + _args[0]);
}
}
public async Task NextAsync()
{
try
{
WriteToBuffer();
await ReadAsync();
}
catch (Exception e)
{
await HandleError(e);
}
}
private void WriteToBuffer()
{
for (int i = 0; i < _commandsLength; i++)
{
ref Command cmd = ref _commands[i];
if (cmd.Value == null)
{
_buffer.Append("$-1\r\n");
}
else
{
_buffer.Append($"${cmd.Value.Length}\r\n{cmd.Value}\r\n");
}
}
_commandsLength = 0;
}
public async Task HandleError(Exception e)
{
using (Tcp)
{
try
{
string? line;
var errReader = new StringReader(e.ToString());
while ((line = errReader.ReadLine()) != null)
{
await Writer.WriteAsync("-");
await Writer.WriteLineAsync(line);
}
await Writer.FlushAsync();
}
catch (Exception)
{
// nothing we can do
}
}
}
internal void Execute(Dictionary<string, string> localDic, int index)
{
int? next = null;
for (int i = 0; i < _commandsLength; i++)
{
ref var cmd = ref _commands[i];
var cur = Math.Abs(cmd.Key.GetHashCode()) % _shardFactor;
if (cur == index) // match
{
cmd.Completed = true;
if (cmd.Value != null)
{
localDic[cmd.Key] = cmd.Value;
}
else
{
localDic.TryGetValue(cmd.Key, out cmd.Value);
}
}
else if (cmd.Completed == false)
{
next = cur;
}
}
if (next != null)
{
Dic.Enqueue(this, next.Value);
}
else
{
_ = NextAsync();
}
}
}
class ShardedDictionary
{
Dictionary<string, string>[] _dics;
BlockingCollection<Client>[] _workers;
public int Factor => _dics.Length;
public ShardedDictionary(int shardingFactor)
{
_dics = new Dictionary<string, string>[shardingFactor];
_workers = new BlockingCollection<Client>[shardingFactor];
for (int i = 0; i < shardingFactor; i++)
{
var dic = new Dictionary<string, string>();
var worker = new BlockingCollection<Client>();
_dics[i] = dic;
_workers[i] = worker;
var index = i;
// readers
new Thread(() =>
{
ExecWorker(dic, index, worker);
})
{
IsBackground = true,
}.Start();
}
}
private static void ExecWorker(Dictionary<string, string> dic, int index, BlockingCollection<Client> worker)
{
while (true)
{
worker.Take().Execute(dic, index);
}
}
public void Enqueue(Client c, int index)
{
_workers[index].Add(c);
}
}
現在,我已經完成了這些簡單的工作,我決定將Redis實現改為使用System.IO.Pipelines
。這是一個高效能的I/O API,專門針對那些需要高系統效能的伺服器設計。
API有一點不同,但是它的使用方式非常合乎邏輯,並且有意義。下面是用於處理來自使用者端命令的主迴圈:
public async Task HandleConnection()
{
while (true)
{
var result = await _netReader.ReadAsync();
var (consumed, examined) = ParseNetworkData(result);
_netReader.AdvanceTo(consumed, examined);
await _netWriter.FlushAsync();
}
}
我們的想法是,我們從網路獲得一個緩衝區,我們讀取一切(包括流水線命令) ,然後重新整理到使用者端。當我們開始處理實際的命令時,更有趣的事情發生了,因為現在我們使用的不是 StreamReader
而是PipeReader
。所以我們處理的是位元組級別,而不是字串級別。
下面是大致的程式碼,我沒有展示整個程式碼,因為我想集中在我遇到的問題:
(SequencePosition Consumed, SequencePosition Examined) ParseNetworkData(ReadResult result)
{
var reader = new SequenceReader<byte>(result.Buffer);
while (true)
{
_cmds.Clear();
if (reader.TryReadTo(out ReadOnlySpan<byte> line, (byte)'\n') == false)
return (reader.Consumed, reader.Position);
if (line.Length == 0 || line[0] != '*' || line[line.Length - 1] != '\r')
ThrowBadBuffer(result.Buffer);
if (Utf8Parser.TryParse(line.Slice(1), out int argc, out int bytesConsumed) == false ||
bytesConsumed + 2 != line.Length) // account for the * and \r
ThrowBadBuffer(result.Buffer);
for (int i = 0; i < argc; i++)
{
// **** redacted - reading cmd to _cmds buffer
}
ExecCommand(_cmds);
}
}
程式碼從緩衝區讀取並解析Redis協定,然後執行命令。它在同一個緩衝區(流水線)中支援多個命令,而且效能非常糟糕。
是的,相對於使用字串的簡單性而言,對於位元組處理想使用正確API要難得多,而且它的速度比字串還要慢得多。在我的開發機器上,我說的慢得多是指以下幾點:
是的,你沒有看錯,每秒少於100次操作,而未優化版本的操作則超過10萬次。
你可以想象,那真是... 令人驚訝。
我實際上寫了兩次實現,使用不同的方法,試圖找出我做錯了什麼。使用PipeReader
肯定沒那麼糟。
我檢視了分析器的輸出,試圖弄清楚發生了什麼:
它非常清楚地表明,這個實現非常糟糕,不是嗎?到底怎麼回事?
底層的問題實際上相當簡單,並且與Pipelines API如何實現這麼高的效能有關。替代掉那些高頻的System call,您需要獲得一個緩衝區並處理。處理完緩衝區之後,您可以很方便的看到處理了多少資料,然後可以處理另一個呼叫。
然而,實際使用的資料和我們期望的資料是有區別的,如下所示:
# 請求redis 設定15位元組的Key - memtier-2818567
# 資料為256位元組 - xxxxxxxxxx ... xxxxxx
*3
$3
SET
$15
memtier-2818567
$256
xxxxxxxxxx ... xxxxxx
# 請求redis 獲取Key - memtier-2818567 對應的資料
*2
$3
GET
$15
memtier-7689405
# 請求redis 獲取Key - memtier-2818567 對應的資料
*2
$3
GET
$15
# !!! 這裡發現有問題,Key應是memtier-2818567 但是唯讀取出了memt
memt
您在這裡看到的是一個流水線命令,緩衝區中有335個位元組。我們將在一次讀取中中處理所有這些命令,除了... 看著最後四行。這是什麼?
我們得到了使用者端傳送來的部分命令。換句話說,我們需要執行一個Key大小為15位元組的GET操作,但是這裡只接收到了前4個位元組。這是意料之中的事,我們消耗了快取區所有空間,直到最後四行(從而讓 PipeReader 知道我們已經完成了它們)。
問題是,當我們現在在使用者端發出一個請求時,我們在伺服器端得到最後四行的部分(我們沒有使用它) ,但是我們還沒有準備好處理它。所以資料丟失了,PipeReader知道它需要從網路上讀取更多的資料。
但是... 我的程式碼有一個小bug。它將報告說它檢查了下面黃色的部分,而沒有檢查綠色的部分。
換句話說,我們告訴PipeReader,我們已經消費了緩衝區的一部分,又檢查了緩衝區的一部分,但緩衝區上還有一些位元組既沒有消費也沒有檢查。這意味著,當我們發出讀取呼叫,期望從網路上獲得資料時,我們實際上會再次獲得相同的緩衝區,進行完全相同的處理。
最終,我們在緩衝區中會有更多來自另使用者端的資料,雖然解決方案的正確性不會受到影響,但這會非常的影響效能。
修復非常簡單,我們需要告訴PipeReader我們檢查了整個緩衝區,這樣它就不會忙碌地等待和等待來自網路的更多資料。以下是錯誤修復方法:
< return (reader.Consumed, reader.Position);
修改為:
> return (reader.Consumed, result.Buffer.End);
有了這一改動,我們可以達到每秒187,104.21次操作!這比以前提高了50%,這真是太棒了。我還沒有對事情進行適當的分析,因為我還想解決另一個問題,我們如何處理來自網路的資料。在我的下一篇文章中會有更多關於這個問題的內容。
這一個微小的BUG大家可能比較難理解,因為很多人都沒有接觸過PipeReader這麼底層的API。我們來看看上文中while迴圈的程式碼:
public async Task HandleConnection()
{
while (true)
{
var result = await _netReader.ReadAsync();
var (consumed, examined) = ParseNetworkData(result);
// 主要是AdvanceTo方法,這個方法有兩個引數
// consumed: 目前處理了多少資料,比如redis協定是按行處理,也就是\n
// examined:檢查了多少資料,檢查的資料和處理的資料不一定一樣,因為
// 可能由於網路延時,還沒有接收一個完整的封包
_netReader.AdvanceTo(consumed, examined);
await _netWriter.FlushAsync();
}
}
另外就是修改點:
< return (reader.Consumed, reader.Position);
修改為:
> return (reader.Consumed, result.Buffer.End);
修改前的程式碼是檢查的資料是返回當前的Position
,但是當前的Position是小於我們實際上檢查的長度
按照 if (reader.TryReadTo(out ReadOnlySpan<byte> line, (byte)'\n') == false)
程式碼所示,我們其實檢查了流中的所有位置,只是從頭讀到尾巴沒有讀取到\n
,如上面的例子就是讀取到了最後一行,唯讀取了ment
,因為網路請求原因,完整的memtier-7689405\n
還沒有接收到。
此時我們返回Position
是上圖中黃色的部分,但是實際上我們是檢查到了綠色的memt
部分,返回到上層以後,執行_netReader.AdvanceTo(consumed, examined);
。
Pipeline發現還有剩餘的綠色memt
沒有被檢查,就會繼續走var (consumed, examined) = ParseNetworkData(result);
,又重新讀取了memt
,由於沒有\n
又返回了黃色部分的Position
,所以這裡就形成了忙等,再沒有新的資料到來之前,這裡將一直迴圈;雖然沒有BUG,但是非常影響效能。
而修改以後檢查位置返回result.Buffer.End
,就包括了綠色的memt
部分,這樣的話var result = await _netReader.ReadAsync();
只有當有新的資料到來時才會繼續走下面的程式碼,這樣的話充分的利用了Pipelines的優勢,效能會更加好。
之前一直有朋友讓開通公眾號,由於一直比較忙沒有弄。
現在終於抽空弄好了,譯者公眾號如下,歡迎大家關注。
使用.NET簡單實現一個Redis的高效能克隆版(一)
使用.NET簡單實現一個Redis的高效能克隆版(二)
使用.NET簡單實現一個Redis的高效能克隆版(三)