MQTTnet是一個高效能的 .NET MQTT庫,它提供MQTT使用者端和MQTT伺服器的功能,支援到最新MQTT5協定版本,支援.Net Framework4.5.2版本或以上。
MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker) and supports the MQTT protocol up to version 5. It is compatible with mostly any supported .NET Framework version and CPU architecture.
我有一些小型專案,需要安裝在區域網環境下的windows或linux系統,這個安裝過程需要小白也能安裝,而且每天都有可能有多份新的安裝部署的新環境,所以流行的mqtt伺服器emqx可能變得不太適合我的選型,因為讓小白來大量部署它不是非常方便。
我的這個小專案主體是一個Web專案,瀏覽器使用者物件是管理員,資料的產生者是N多個廉價linux小型裝置,裝置使用mqtt協定高頻提交資料到後臺,後臺也需要使用mqtt協定來主動控制裝置完成一些操作動作。除此之後,Web瀏覽器也需要使用mqtt over websocket來訂閱一些主題,達到監控某臺裝置的實時資料目的。
經過比較,MQTTnet變成了我意向使用的mqtt庫,尤其是MQTTnet.AspNetCore
子專案,基於kestrel來使用tcp或websocket做傳輸層,增加mqtt應用層協定的解析,最後讓mqtt與asp.netcore完美地融合在一起。
專案有後臺主動傳送mqtt到裝置以控制裝置的需求,在mqttnet裡有個對應的InjectApplicationMessage()
擴充套件方法可以從server主動傳送mqtt到client,但這個方法總是丟擲ArgumentNullException
。但如果使用InjectApplicationMessage (InjectedMqttApplicationMessage)
這個基礎方法來注入mqtt訊息不有異常。
經過一段時間後,閒時的我決定遷出mqttnet專案的原始碼來偵錯分析。最後發現是因為這個擴充套件方法沒有傳遞SenderClientId導致的異常,所以我決定嘗試修改並推播一個請求到mqttnet專案。
經過嘗試修改一個小小bug之後,我開始認真的閱讀MQTTnet.AspNetCore
的原始碼,陸續發現一些可以減少記憶體複製和記憶體分配的優化點:
ReadOnlyMemory<byte>
轉為ReceivedMqttPacket
過程優化;MqttPacketBuffer
傳送過程的優化;Array.Copy()
的改進;Byte[]
-> ArraySegment<byte>
的優化;ReadOnlyMemory<byte>
轉為byte[]
原始程式碼
var bodySlice = copy.Slice(0, bodyLength);
var buffer = bodySlice.GetMemory().ToArray();
var receivedMqttPacket = new ReceivedMqttPacket(fixedHeader, new ArraySegment<byte>(buffer, 0, buffer.Length), buffer.Length + 2);
static ReadOnlyMemory<byte> GetMemory(this in ReadOnlySequence<byte> input)
{
if (input.IsSingleSegment)
{
return input.First;
}
// Should be rare
return input.ToArray();
}
原始程式碼設計了一個GetMemory()
方法,目的是在兩個地方呼叫到。但它的一句var buffer = bodySlice.GetMemory().ToArray()
,就會無條件的產生一次記憶體分配和一次記憶體拷貝。
改程序式碼
var bodySlice = copy.Slice(0, bodyLength);
var bodySegment = GetArraySegment(ref bodySlice);
var receivedMqttPacket = new ReceivedMqttPacket(fixedHeader, bodySegment, headerLength + bodyLength);
static ArraySegment<byte> GetArraySegment(ref ReadOnlySequence<byte> input)
{
if (input.IsSingleSegment && MemoryMarshal.TryGetArray(input.First, out var segment))
{
return segment;
}
// Should be rare
var array = input.ToArray();
return new ArraySegment<byte>(array);
}
因為有其它地方的優化,GetMemory()
不再需要複用,所以我們直接改為GetArraySegment()
,裡面使用MemoryMarshal.TryGetArray()
方法嘗試從ReadOnlyMemory<byte>
獲取ArraySegment<byte>
物件。而mqttnet的ReceivedMqttPacket
物件是支援ArraySegment<byte>
型別引數的。
在我提交請求之後,@gfoidl給了很多其它特別好的效能方面的建議,有興趣的同學可以點此檢視。
戲劇性的是,在我嘗試改進這個問題的時候,我發現了mqttnet的另外一個BUG:當bodySegment的Offset不是0開始的時候,mqttnet會產生異常。這足以說明,mqttnet專案從未使用Offset大於0的ArraySegment<byte>
,所以這個bug才一直沒有發現。本為不是MQTTnet.AspNetCore
子專案的程式碼我就不改的原則,我向mqttnet提了問題:https://github.com/dotnet/MQTTnet/issues/1592 作者也很認真看待這個問題,於是自己加班解決:https://github.com/dotnet/MQTTnet/pull/1593
更戲劇性的是,我開心地合併main程式碼過來驗證之後,發現作者改的BUG裡又帶入了BUG!現在Offset大於0還是有問題。於是我心急啊,我決定為這個BUG中BUG提交一個修改的請求:https://github.com/dotnet/MQTTnet/pull/1598
最後,這個MemoryMarshal.TryGetArray()
的優化終於提到合併,改進後CPU時間時間也減少了,記憶體分配更是減少了50%。
MqttPacketBuffer
傳送過程的優化MqttPacketBuffer有兩個資料段:Pacaket段和Payload段,我看到它原始傳送程式碼如下:
var buffer = formatter.Encode(packet);
var msg = buffer.Join().AsMemory();
var output = _output;
var result = await output.WriteAsync(msg, cancellationToken).ConfigureAwait(false);
我也沒有經過認證思考,覺得這裡可以將Pacaket段和Payload直接兩次傳送即可。
var buffer = PacketFormatterAdapter.Encode(packet);
await _output.WriteAsync(buffer.Packet, cancellationToken).ConfigureAwait(false);
if (buffer.Payload.Count > 0)
{
await _output.WriteAsync(buffer.Payload, cancellationToken).ConfigureAwait(false);
}
後來作者說,當mqtt over websocket時,有些使用者端在實現上沒能相容一個mqtt包分多個websocket幀傳輸的處理,所以需要合併行送。那我就想,如果我檢測傳輸層是websocket的話再Join合併就行了,於是改為如下:
if (_isOverWebSocket == false)
{
await _output.WriteAsync(buffer.Packet, cancellationToken).ConfigureAwait(false);
if (buffer.Payload.Count > 0)
{
await _output.WriteAsync(buffer.Payload, cancellationToken).ConfigureAwait(false);
}
}
else
{
var bufferSegment = buffer.Join();
await _output.WriteAsync(bufferSegment, cancellationToken).ConfigureAwait(false);
}
雖然覺得這個方案比之前要好了一些,但感覺Jion裡的 new byte[]
的分配讓我耿耿於懷。再經過幾將進改,最後的程式碼如下,雖然也有拷貝,但至少已經沒有分配:
if (buffer.Payload.Count == 0)
{
// zero copy
// https://github.com/dotnet/runtime/blob/main/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/StreamPipeWriter.cs#L279
await _output.WriteAsync(buffer.Packet, cancellationToken).ConfigureAwait(false);
}
else
{
WritePacketBuffer(_output, buffer);
await _output.FlushAsync(cancellationToken).ConfigureAwait(false);
}
static void WritePacketBuffer(PipeWriter output, MqttPacketBuffer buffer)
{
// copy MqttPacketBuffer's Packet and Payload to the same buffer block of PipeWriter
// MqttPacket will be transmitted within the bounds of a WebSocket frame after PipeWriter.FlushAsync
var span = output.GetSpan(buffer.Length);
buffer.Packet.AsSpan().CopyTo(span);
buffer.Payload.AsSpan().CopyTo(span.Slice(buffer.Packet.Count));
output.Advance(buffer.Length);
}
Array.Copy()
的改進mqttnet由於要相容很多.net框架和版本,所以往往能使用的api不多,比如在記憶體拷貝了,還保留了最初的Array.Copy()
,我們可以較新的框架下使用更好的api來複制,最高可達25%的複製效能提升,這個改進的工作量非常小,但產出是相當的可喜啊。
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void Copy(byte[] source, int sourceIndex, byte[] destination, int destinationIndex, int length)
{
#if NETCOREAPP3_1_OR_GREATER || NETSTANDARD2_1
source.AsSpan(sourceIndex, length).CopyTo(destination.AsSpan(destinationIndex, length));
#elif NET461_OR_GREATER || NETSTANDARD1_3_OR_GREATER
unsafe
{
fixed (byte* pSoure = &source[sourceIndex])
{
fixed (byte* pDestination = &destination[destinationIndex])
{
System.Buffer.MemoryCopy(pSoure, pDestination, length, length);
}
}
}
#else
Array.Copy(source, sourceIndex, destination, destinationIndex, length);
#endif
}
Byte[]
-> ArraySegment<byte>
的優化當前的mqttnet,由於歷史設計的侷限原因,現在還不能建立ArraySegment<byte>
或Memory<byte>
作為payload的mqtt訊息包。如果我們從ArrayPool申請1000位元組的buffer,實際我們會得到一個到1024位元組的buffer,想拿租賃的buffer的前1000位元組做mqtt訊息的payload,我們現在不得不再建立一個1000位元組的byte[1000]
newpayload,然後拷貝buffer到newpayload。
這種侷限對伺服器端來說弊端是很大的,我現在嘗試如何不破壞原始的byte[]
支援的設計提前下,讓mqttnet也支援ArraySegment<byte>
的資料傳送。當然,保持相容性的新Api加入對專案來說是一種大的變化,自然有一定的風險性。
如果你也關注這個mqttnet專案,你可以檢視 https://github.com/dotnet/MQTTnet/pull/1585 這個提議,也許未來它會變成現實。
開源專案讓大眾受益,尤其是核心作者真的不容易,為其嘔心瀝血。我們在受益的同時,如果有能力的話可以反撫開源專案,在參與過程中,自身也會學到一些知識的,就當作被學習的過程吧。