使用.NET簡單實現一個Redis的高效能克隆版(三)

2022-08-08 12:03:55

譯者注

該原文是Ayende Rahien大佬業餘自己在使用C# 和 .NET構建一個簡單、高效能相容Redis協定的資料庫的經歷。
首先這個"Redis"是非常簡單的實現,但是他在優化這個簡單"Redis"路程很有趣,也能給我們在從事效能優化工作時帶來一些啟示。
原作者:Ayende Rahien
原連結:https://ayende.com/blog/197473-C/high-performance-net-building-a-redis-clone-architecture

構建Redis克隆版-架構

在之前的文章中,我們嘗試用最簡單的方式來完成一個Redis克隆版。開啟一個通訊端來監聽,為每個使用者端單獨分配一個Task來從網路讀取資料,解析命名並執行它。雖然在流水線上有一些小的改進,但也只僅此而已。

讓我們退一步來構建一個與Redis架構更為接近的Redis克隆版。為此,我們需要在一個執行緒中完成所有工作。這在C#中是比較難實現的,沒有用於執行Redis那樣工作型別的API。更確切的來說是有Socket.Select()方法,但是需要我們自己在此基礎上構建一切(比如我們必須寫程式碼處理緩衝、字串等等)。

考慮到這是通往最終建議的架構的一箇中途站,我決定完全跳過這個。相反,我將首先專注於消除系統中的主要瓶頸,即ConcurrentDictionary

分析器的結果表明,我們這最大的開銷就是ConcurrentDictionary的可伸縮性。即使我使用了1024個分片的鎖,它仍然佔用50%的時間開銷。問題是,我們能做得更好嗎?我們可以嘗試一個更好的選擇,就是我們不再使用ConcurrentDictionary,而是直接使用單獨的Dictionary來分片,這樣的話每個Dictionary都不需要並行就可以存取。

我的想法是這樣的,我們將為使用者端提供常規的讀寫操作。但是,我們不會直接在I/O上處理這些命令,而是將其路由到一個專用的執行緒(使用它自己的Dictionary)來完成這項工作。因為我是16核的機器,我將建立10個這樣的執行緒(假設它們每個都能分配到1個核心),並且我能夠將I/O處理放到其餘的6個核心上。

以下是更改後的結果:

請注意,我們現在跑分的資料是125w/s,比上一次幾乎增長了25%。
下面是這一次新程式碼的分析器結果:

因此在本例中,花費了大量的時間來處理各種各樣的字串,等待GC(大約佔30%)。集合的成本下降了很多。
還有一些其它的開銷出現在我眼前,看看這裡:

對於「簡單」屬性查詢來說,這個開銷非常驚人。另外SubString函數的呼叫開銷也很大,超過整個系統開銷的6%。
在研究系統其它部分時,看到了這個:

這真的很有趣,因為我們花了很多的時間在等待佇列中是否有新的元素,其實我們可以做更多的事情,而不是就在那乾等著。

我還嘗試了其它的執行緒數量,如果只執行一個ExecWorker,我們的執行速度是40w/s,兩個執行緒,我們的執行速度是70w/s。當使用4個專用於處理請求的執行緒時,我們的執行速度是106w/s。

因此,很明顯,我們需要重新考慮這種方案,我們不能夠正確地擴充套件到合適的數值。
注意,這種方法也不利用流水線。我們分別處理每個命令和其他命令。我的下一步是新增對使用這種方法的流水線的支援,並測量這種影響。

從另一方面來說,我們現在的效能還是100w/s,考慮到我只花了很少的時間來實現方案,從這個方案可以獲得25w/s的效能提升,這是令人激動人心的。從側面說,我們還有更多的事情可以做,但我想把重點放在修復我們第一個方案上。

下面是當前的狀態,因此您可以與原始程式碼比較


using System.Collections.Concurrent;
using System.Net.Sockets;
using System.Threading.Channels;

var listener = new TcpListener(System.Net.IPAddress.Any, 6379);
listener.Start();

var redisClone = new RedisClone();

while (true)
{
    var client = listener.AcceptTcpClient();
    var _ = redisClone.HandleConnection(client); // run async
}

public class RedisClone
{
    ShardedDictionary _state = new(Environment.ProcessorCount / 2);

    public async Task HandleConnection(TcpClient tcp)
    {
        var _ = tcp;
        var stream = tcp.GetStream();
        var client = new Client
        {
            Tcp = tcp,
            Dic = _state,
            Reader = new StreamReader(stream),
            Writer = new StreamWriter(stream)
            {
                NewLine = "\r\n"
            }
        };
        await client.ReadAsync();

    }

}

class Client
{
    public TcpClient Tcp;
    public StreamReader Reader;
    public StreamWriter Writer;
    public string Key;
    public string? Value;

    public ShardedDictionary Dic;

    List<string> Args = new();

    public async Task ReadAsync()
    {
        try
        {
            Args.Clear();
            var lineTask = Reader.ReadLineAsync();
            if (lineTask.IsCompleted == false)
            {
                await Writer.FlushAsync();
            }
            var line = await lineTask;
            if (line == null)
            {

                using (Tcp)
                {
                    return;
                }
            }
            if (line[0] != '*')
                throw new InvalidDataException("Cannot understand arg batch: " + line);

            var argsv = int.Parse(line.Substring(1));
            for (int i = 0; i < argsv; i++)
            {
                line = await Reader.ReadLineAsync();
                if (line == null || line[0] != '$')
                    throw new InvalidDataException("Cannot understand arg length: " + line);
                var argLen = int.Parse(line.Substring(1));
                line = await Reader.ReadLineAsync();
                if (line == null || line.Length != argLen)
                    throw new InvalidDataException("Wrong arg length expected " + argLen + " got: " + line);

                Args.Add(line);
            }

            switch (Args[0])
            {
                case "GET":
                    Key = Args[1];
                    Value = null;
                    break;
                case "SET":
                    Key = Args[1];
                    Value = Args[2];
                    break;
                default:
                    throw new ArgumentOutOfRangeException("Unknown command: " + Args[0]);
            }
            Dic.Run(this);
        }
        catch (Exception e)
        {
            await HandleError(e);
        }
    }

    public async Task NextAsync()
    {
        try
        {
            if (Value == null)
            {
                await Writer.WriteLineAsync("$-1");
            }
            else
            {
                await Writer.WriteLineAsync($"${Value.Length}\r\n{Value}");
            }
            await ReadAsync();
        }
        catch (Exception e)
        {
            await HandleError(e);
        }
    }

    public async Task HandleError(Exception e)
    {
        using (Tcp)
        {
            try
            {
                string? line;
                var errReader = new StringReader(e.ToString());
                while ((line = errReader.ReadLine()) != null)
                {
                    await Writer.WriteAsync("-");
                    await Writer.WriteLineAsync(line);
                }
                await Writer.FlushAsync();
            }
            catch (Exception)
            {
                // nothing we can do
            }
        }
    }
}

class ShardedDictionary
{
    Dictionary<string, string>[] _dics;
    BlockingCollection<Client>[] _workers;

    public ShardedDictionary(int shardingFactor)
    {
        _dics = new Dictionary<string, string>[shardingFactor];
        _workers = new BlockingCollection<Client>[shardingFactor];

        for (int i = 0; i < shardingFactor; i++)
        {
            var dic = new Dictionary<string, string>();
            var worker = new BlockingCollection<Client>();
            _dics[i] = dic;
            _workers[i] = worker;
            // readers
            new Thread(() =>
            {
                ExecWorker(dic, worker);
            })
            {
                IsBackground = true,
            }.Start();
        }
    }

    private static void ExecWorker(Dictionary<string, string> dic, BlockingCollection<Client> worker)
    {
        while (true)
        {
            var client = worker.Take();
            if (client.Value != null)
            {
                dic[client.Key] = client.Value;
                client.Value = null;
            }
            else
            {
                dic.TryGetValue(client.Key, out client.Value);
            }
            var _ = client.NextAsync();
        }
    }

    public void Run(Client c)
    {
        var reader = _workers[c.GetHashCode() % _workers.Length];
        reader.Add(c);
    }

}

公眾號

之前一直有朋友讓開通公眾號,由於一直比較忙沒有弄。
現在終於抽空弄好了,譯者公眾號如下,歡迎大家關注。

系列連結

使用.NET簡單實現一個Redis的高效能克隆版(一)
使用.NET簡單實現一個Redis的高效能克隆版(二)