AlterNats是如何做到高效能的釋出訂閱的?

2022-07-12 12:01:11

前言

在過去的一些文章裡面,我們聊了一些.NET平臺上高效能程式設計的技巧,今天帶大家瞭解一下AlterNats這個庫是如何做到遠超同類SDK效能的。

NATS:NATS是一個開源、輕量級、高效能的分散式訊息中介軟體,實現了高可伸縮性和優雅的Publish/Subscribe模型。NATS的開發哲學認為高質量的QoS應該在使用者端構建,故只建立了Request-Reply,不提供 1.持久化 2.事務處理 3.增強的交付模式 4.企業級佇列等功能,所以它的效能可以非常好。

NATS.NET:NATS.NET是NATS官方實現的C#語言使用者端,它的架構和Go版本保持一致,導致沒有使用一些高效能的API和新的語法,效能整體較弱,不過它支援.NET4.6+和.NETStandard1.6+幾乎相容主流的.NET版本。

AlterNats:因為官方實現的NATS.NET效能較弱,所以大佬又實現使用了C#和.NET新特性和API編寫了這個高效能NATS使用者端,它的釋出訂閱效能比StackExchange.Redis和官方的Nats.Net快三倍以上。

上圖是8byte資料釋出訂閱效能對比,可以看到AlterNats遙遙領先,比官方的實現快了很多。下面就帶大家瞭解一下如何使用AlterNats和為什麼它能實現這麼高的效能。

使用

AlterNats的API完全採用async/await並保持C#原生風格。

// 建立一個連結
await using var conn = new NatsConnection();

// 訂閱訊息
var subscription = await conn.SubscribeAsync<Person>("foo", x =>
{
    Console.WriteLine($"Received {x}");
});

// 釋出訊息
await conn.PublishAsync("foo", new Person(30, "bar"));

// 退訂
subscription.Dipose();

// ---

public record Person(int Age, string Name);

NatsOptions/ConnectOptions是不可變的記錄,它可以使用C#的new和with語法,非常的方便。

// 可選設定項可以通過`with`關鍵字
var options = NatsOptions.Default with
{
    Url = "nats://127.0.0.1:9999",
    LoggerFactory = new MinimumConsoleLoggerFactory(LogLevel.Information),
    Serializer = new MessagePackNatsSerializer(),
    ConnectOptions = ConnectOptions.Default with
    {
        Echo = true,
        Username = "foo",
        Password = "bar",
    }
};

await using var conn = new NatsConnection(options);

它還提供了用於接收結果的標準協定。可以將其用作伺服器之間的簡單RPC,在某些情況下可能很有用。

// 伺服器端
await conn.SubscribeRequestAsync("foobar", (int x) => $"Hello {x}");

// 使用者端
var response = await conn.RequestAsync<int, string>("foobar", 100);

如何做到高效能的?

在之前的文章中,和大家聊過,高效能就是在相同的資源的情況下,能處理更多的資料。我們需要降低單次資料處理所耗費的資源(CPU、記憶體、磁碟等等),下面就帶大家瞭解AlterNats做了什麼,來節省這些資源。

高效能Socket程式設計

在C#中,最底層的網路處理類是Socket,如果你想要非同步、高效能的處理網路請求,你需要重用帶回撥的SocketAsyncEventArgs
然而,現在我們有更簡單的方式使用async/await方法,不需要複雜的SocketAsyncEventArgs,不過它有許多使用非同步的方法,需要你選擇正確的一個去使用。最簡單選擇的方法就是,用返回值為ValueTask的API就好了。

// 使用這些
public ValueTask ConnectAsync(string host, int port, CancellationToken cancellationToken)

public ValueTask<int> ReceiveAsync(Memory<byte> buffer, SocketFlags socketFlags, CancellationToken cancellationToken)

public ValueTask<int> SendAsync(ReadOnlyMemory<byte> buffer, SocketFlags socketFlags, CancellationToken cancellationToken))

// 不要使用這些
public Task ConnectAsync(string host, int port)

public Task<int> ReceiveAsync(ArraySegment<byte> buffer, SocketFlags socketFlags)

public Task<int> SendAsync(ArraySegment<byte> buffer, SocketFlags socketFlags)

返回ValueTask的API內部使用的AwaitableSocketAsyncEventArgs,它非常的高效,另外由於ValueTask是結構體型別,無需像Task一樣在堆上分配,還能簡單的享受到非同步帶來的效能提升。與SocketAsyncEventArgs相比,這是一個非常大的改進,SocketAsyncEventArgs非常難用,我強烈推薦上面提到的ValueTask API.

另外要注意的是,同步API可以使用Span<T>型別,但非同步的API只能使用Memroy<T>(因為資料需要存在在堆上),這個點不僅限於Socket網路程式設計,其它API也是一樣,如果整個系統在設計的時候就沒有考慮這些,那麼無法使用Span<T>型別可能為成為障礙。但是你必須保證你可以隨心所欲的使用Memory<T>

使用二進位制解析文字協定

NATS的協定是基於文字的協定,和Redis等協定類似,它可以簡單通過字串函數來拆分和處理。可以使用StreamReader很容易的實現這個協定,因為你所需要就是ReadLine來讀取資料就好。然而,在網路上傳輸的是UTF-8格式的二進位制資料,將其作為字串來處理開銷較大,如果我們需要高效能,那麼必須將其作為二進位制資料來處理。

NATS的協定可以通過前導字串(INFO、MSG、PINT、+OK、-ERR等等)來確定訊息的型別。雖然可以很容易的使用if(msg == "INFO")這樣的程式碼來分割字串處理,但是出於效能原因,這樣的開銷是不可接受的。

那麼我們使用什麼樣的方法來提升效能呢?舉一個例子,字串INFO的UTF-8編碼是[73, 78, 70, 79]

我們可以直接使用ReadOnlySpan<byte>.SequenceEqual()函數來處理(這個函數優化非常好,它會使用SIMD來提高效能)。

不過在我們的場景裡,因為NATS的前導字串都在4byte以內,所以我們可以將INFO轉換為一個int型別來處理,比如將字串INFO的UTF-8編碼轉換為int就是1330007625.

所以在AlterNats裡面的程式碼就是這樣處理INFO的。

var msg = new byte[] {73,78,70,79};
if (Unsafe.ReadUnaligned<int>(ref MemoryMarshal.GetReference<byte>(msg)) == 1330007625) // INFO
{
	"Command is INFO".Dump();
}


這應該是在理論上最快的判斷方式了,3個字元的指令後面總是緊跟著空格或者換行符,所以可以使用下面這些常數來判斷其它的型別。


internal static class ServerOpCodes
{
    public const int Info = 1330007625;  // Encoding.ASCII.GetBytes("INFO") |> MemoryMarshal.Read<int>
    public const int Msg = 541545293;    // Encoding.ASCII.GetBytes("MSG ") |> MemoryMarshal.Read<int>
    public const int Ping = 1196312912;  // Encoding.ASCII.GetBytes("PING") |> MemoryMarshal.Read<int>
    public const int Pong = 1196314448;  // Encoding.ASCII.GetBytes("PONG") |> MemoryMarshal.Read<int>
    public const int Ok = 223039275;     // Encoding.ASCII.GetBytes("+OK\r") |> MemoryMarshal.Read<int>
    public const int Error = 1381123373; // Encoding.ASCII.GetBytes("-ERR") |> MemoryMarshal.Read<int>
}

使用棧上分配

在請求傳送中,有很多小的字串和byte[]物件,這些小物件會比較頻繁產生從而影響GC標記時間,在AlterNats中,比較多的使用了stackalloc byte[10]將這些小的物件分配在棧上,當方法結束時,物件就自動釋放了,無需GC再參與,有利於降低記憶體佔用率和GC的暫停時間。

public void WritePublish<T>(in NatsKey subject, ReadOnlyMemory<byte> inboxPrefix, int id, T? value, INatsSerializer serializer)  
{  
    // 棧上分配小物件
    Span<byte> idBytes = stackalloc byte[10];  
    if (Utf8Formatter.TryFormat(id, idBytes, out var written))  
    {  
        idBytes = idBytes.Slice(0, written);  
    }  
  
    var offset = 0;  
    var maxLengthWithoutPayload = CommandConstants.PubWithPadding.Length  
        + subject.LengthWithSpacePadding  
        + (inboxPrefix.Length + idBytes.Length + 1) // with space  
        + MaxIntStringLength  
        + NewLineLength;
}

自動管道批次處理

在NATS協定中,所有的寫入和讀取操作都是流水線的(批次處理)。這很容易用Redis的流水線來解釋。比如,如果你同一時間傳送3個訊息,每次傳送一個,然後等待響應,那麼多次往返的傳送和接收會成為效能瓶頸。

在傳送訊息中,AlterNats自動將它們組織成流水線:使用System.Threading.Channels,訊息被打包進入佇列,然後由一個寫回圈檢索它們,並將它們通過網路成批的傳送出去。一旦網路傳輸完成,寫回圈的方法又會將等待網路傳輸時累積的訊息再次進行批次處理。

這不僅能節省往返的時間(在NATS中,釋出和訂閱都是獨立的,所以不需要等待響應),另外它也能減少連續的系統呼叫。.NET最快的紀錄檔記錄元件ZLogger也採用了相同的方法。

將許多功能整合到單個物件中

為了實現這樣的PublishAsync方法,我們需要將資料放入佇列的Channel中,並且將其固定在堆上。我們還需要一個非同步方法的Task,以便我們可以用await等待它寫入完成。

await connection.PublishAsync(value);

為了高效地實現這樣一個API,避免多餘的分配,我們把所有的功能都在一個訊息物件(內部名稱叫Command)裡面,這樣的話只有它會被分配記憶體。

class AsyncPublishCommand<T> : 
    ICommand,
    IValueTaskSource, 
    IThreadPoolWorkItem, 
    IObjectPoolNode<AsyncPublishCommand<T>>

internal interface ICommand
{
    void Write(ProtocolWriter writer);
}

internal interface IObjectPoolNode<T>
{
    ref T? NextNode { get; }
}

這個物件(AsyncPublicCommand)本身就有用於儲存T型別資料和將其二進位制資料寫入Socket的角色(ICommand)。

此外,通過實現IValueTaskSource介面,該物件本身也變成了ValueTask。

然後,await後面的回撥需要交給執行緒池處理,以避免阻塞寫回圈。使用傳統的ThreadPool.QueueUserWorkItem(callback)會有額外的記憶體分配,因為它會在內部建立一個ThreadPoolWorkItem並將其塞入執行緒池佇列中。在.NET Core 3.0以後我們可以通過實現IThreadPoolWorkItem來避免內部ThreadPoolWorkItem物件的記憶體分配。

最後,我們只有一個物件需要分配,另外我們還可以池化這個物件,使其達到零分配(zero allocated)。可以使用ConcurrentQueue<T>或者類似的輕鬆實現物件池,上面的類中,通過實現IObjectPoolNode<T>介面,使它自己成為棧中的節點,避免分配陣列。堆疊也可以提供一個無效的實現,為這種快取的使用進行優化。

零拷貝架構

需要釋出、訂閱的資料通常是序列化的C#型別,比如Json、MessagePack等。在這種情況下,它們不可避免的會使用bytes[]交換資料,例如,StackExchange.Redis中的RedisValue內容實際上就是bytes[],無論是傳送還是接收,我們都需要建立和儲存bytes[]

為了避免這種情況,通常會使用ArrayPool來實現零分配,但是這仍然會產生複製的成本。當然,零分配是我們的目標,但是我們也要朝著zero-copy去努力。

AlterNats序列化要求使用IBufferWriter<byte>寫入,使用ReadOnlySequence<byte>來讀取。

public interface INatsSerializer
{
    int Serialize<T>(ICountableBufferWriter bufferWriter, T? value);
    T? Deserialize<T>(in ReadOnlySequence<byte> buffer);
}

public interface ICountableBufferWriter : IBufferWriter<byte>
{
    int WrittenCount { get; }
}

// ---

// 舉個例子,使用MessagePack來實現序列化和反序列化

public class MessagePackNatsSerializer : INatsSerializer
{
    public int Serialize<T>(ICountableBufferWriter bufferWriter, T? value)
    {
        var before = bufferWriter.WrittenCount;
        MessagePackSerializer.Serialize(bufferWriter, value);
        return bufferWriter.WrittenCount - before;
    }

    public T? Deserialize<T>(in ReadOnlySequence<byte> buffer)
    {
        return MessagePackSerializer.Deserialize<T>(buffer);
    }
}

C#的System.Text.Json或MessagePack有接收IBufferWriter<byte>引數的序列化過載方法。序列化器通過IBufferWriter直接讀取和寫入Socket提供的緩衝區,從而消除了Socket和序列化器之間的bytes[]複製。

在讀取時,ReadOnlySequence<byte>是必須的,因為從Socket接收的資料通常是分段的。

一種常見的設計模式就使用System.IO.Pipelines的PipeReader來讀取和處理資料,它目的是一個簡單使用的高效能I/O庫。但是AlterNats沒有使用Pipelines,而是使用了自己的讀取機制和ReadOnlySequence<byte>

System.Text.Json和MessagePack for C#的序列化方法提供了一個接受IBufferWriter<byte>引數的過載,反序列化方法接受ReadOnlySequence<byte>。換句話說,現代序列化器必須支援IBufferWriter<byte>ReadOnlySequence<byte>

總結

本文內容70%來自AlterNats作者的部落格文章,這是一篇不可多得的好文章,詳細的說明了AlterNats是如何做到高效能的,讓我們在回顧一下。

  • 使用最新的Socket ValueTask API
  • 將所有的功能放到單個物件中,降低SDK的記憶體分配
  • 池化SDK使用類,棧上分配資料,做到堆上零分配
  • 使用二進位制方式解析NATS協定
  • 對讀取和寫入自動進行批次處理
  • 使用IBufferWriter<byte>ReadOnlySequence<byte>,對網路資料處理做到zero-copy

附錄

AlterNats專案地址: https://github.com/Cysharp/AlterNats