高效能的通訊端程式設計圍繞著兩個方面:非同步和複用。非同步:高效能就是最大化計算機資源的利用,是不可能讓執行緒有阻塞的,所以就有了各種非同步模式。複用:計算機資源最好是能重複使用的,頻繁的建立和銷燬相同的物件也是對資源的浪費,所以就有了各種池和零拷貝;CPU在存取相鄰資源的時候有特別的優勢可以利用快取區,所以池中物件儘量相鄰建立。
Socket通訊端程式設計歷史悠久,發展出好幾種方式,對應著DotNet非同步程式設計的發展,分別:非同步程式設計模式(Asynchronous Programming Model ,APM)、基於事件的非同步模式(Event-based Asynchronous Pattern ,EAP)和基於任務的非同步模式(Task-based Asynchronous Pattern,TAP)。
本文將簡要介紹幾種非同步程式設計對應Socket的實現,每一種都寫了一個簡單的Socket伺服器端以供學習。
通訊端流程如下,在Accept,Read,Write,Connect和Disconnect方法均涉及到非同步程式設計。為什麼會非同步,簡單來說就是執行緒執行速度很快,網路傳輸的IO速度很慢,執行緒發出IO操作的指令後,不可能一直等待指令執行完。所以執行緒設定一個回撥函數的入口地址,讓IO執行完之後呼叫該入口地址,之後執行緒就去幹其他事情了,等該IO呼叫該入口地址,執行緒再回來繼續工作。
Socket介面,下面是用阻塞方法建立的一個簡單伺服器端。可以分析出該服務的效能是很差的,沒有做任何的非同步和複用。
//伺服器端
public static void Run(string m_ip, int m_port)
{
var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
var ip = IPAddress.Parse(m_ip);
var endpoint = new IPEndPoint(ip, m_port);
socket.Bind(endpoint);
socket.Listen(0);
socket.ReceiveTimeout = -1;
//執行緒池中後臺執行緒執行
Task.Run(() =>
{
while (true)
{
var acceptSocket = socket.Accept();//執行緒阻塞等待連線請求佇列
if (acceptSocket != null && acceptSocket.Connected)
{
//執行緒池中後臺執行緒執行
Task.Run(() =>
{
byte[] receiveBuffer = new byte[1024];//每一個連線都在重新建立緩衝區
int result = 0;
do
{
if (acceptSocket.Connected)
{
result = acceptSocket.Receive(receiveBuffer, 0, receiveBuffer.Length,
SocketFlags.None,
out SocketError error);//執行緒阻塞等待緩衝區資料
if (error == SocketError.Success && result > 0)
{
var recestr = Encoding.UTF8.GetString(receiveBuffer, 0, result);
var Replaystr =
$"Server收到訊息:{recestr};Server收到訊息的時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff")}";
var strbytes = Encoding.UTF8.GetBytes(Replaystr);
acceptSocket.Send(strbytes, 0, strbytes.Length, SocketFlags.None);//執行緒阻塞等待傳送完緩衝區資料
if (recestr.Contains("stop"))
{
break;
}
}
}
else
{
break;
}
} while (result > 0);
}).ContinueWith((t) =>
{
System.Threading.Thread.Sleep(1000);
acceptSocket.Disconnect(false);
acceptSocket.Dispose();
});
}
}
}).Wait();
}
BeginXXX方法並不會阻塞執行緒,而EndXXX會,dotnet提供Task<T>.Factory.FromAsync
可以將APM轉成TAP模式非同步模式以提高效能,下面提供一個範例,同時使用ArrayPool複用緩衝區,處理分包,粘包等問。
public static Socket Run(string m_ip, int m_port)
{
var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
var ip = IPAddress.Any;
if (!string.IsNullOrEmpty(m_ip))
{
ip = IPAddress.Parse(m_ip);
}
var endpoint = new IPEndPoint(ip, m_port);
socket.Bind(endpoint);
Console.WriteLine($"[{DateTime.Now.GetFormString()}] Server Established localEndpoint:[{socket.LocalEndPoint.ToString()}]");
socket.Listen(200);
socket.ReceiveTimeout = -1;
//後臺執行緒執行
Task.Run(async () =>
{
while (true)
{
var acceptSocket = await Task<Socket>.Factory.FromAsync(
socket.BeginAccept(null,null)
,socket.EndAccept);//APM轉TAP非同步
if (acceptSocket != null && acceptSocket.Connected)
{
//後臺執行緒來處理Receive邏輯
var task = Task.Run(async () =>
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);//從記憶體池中獲取緩衝區
var bytesBuffered = 0;
var bytesConsumed = 0;
while (true)
{
if (acceptSocket != null && acceptSocket.Connected)
{
var temremaining = bytesBuffered - bytesConsumed;
if (temremaining == 0)//快取區全部解析完
{
bytesBuffered = 0;
bytesConsumed = 0;
}
else if (temremaining < buffer.Length && temremaining > 0)//最後一個包不完整,部分資料未解析
{
var newbuffer = ArrayPool<byte>.Shared.Rent(buffer.Length);
Buffer.BlockCopy(buffer, bytesConsumed, newbuffer, 0, temremaining);
ArrayPool<byte>.Shared.Return(buffer);
buffer = newbuffer;
bytesBuffered = temremaining;
bytesConsumed = 0;
}
else //包不夠大,分包了
{
var newbuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
Buffer.BlockCopy(buffer, 0, newbuffer, 0, buffer.Length);
ArrayPool<byte>.Shared.Return(buffer);
buffer = newbuffer;
}
var bytesRemaining = buffer.Length - bytesBuffered;
try
{
var bytesread = await Task<int>.Factory.FromAsync(
acceptSocket.BeginReceive(buffer, bytesBuffered, bytesRemaining,
SocketFlags.None, null, null), acceptSocket.EndReceive);//APM轉TAP非同步
if (bytesread == 0)
{
break;
}
bytesbuffered += bytesread;
var lineposition = -1;
do
{
lineposition = array.indexof(buffer, (byte)0x23, bytesconsumed,bytesbuffered - bytesconsumed);
if (lineposition >= 0)
{
var lineLength = linePosition - bytesConsumed;
ProcessLine(acceptSocket, buffer, bytesConsumed, bytesread);
bytesConsumed += bytesread;
}
} while (linePosition >= 0);//包解析
}
catch (Exception e)
{
break;
}
}
else
{
break;
}
}
ArrayPool<byte>.Shared.Return(buffer);
}).ContinueWith((t) =>
{
Console.WriteLine($"[{DateTime.Now.GetFormString()}] ServerClient Disconnected localEndpoint:[{acceptSocket?.LocalEndPoint.ToString()}] remoteEndpoint:[{acceptSocket?.RemoteEndPoint.ToString()}]");
acceptSocket?.Shutdown(SocketShutdown.Both);
acceptSocket?.Close();
acceptSocket = null;
});
}
}
});
return socket;
}
目前應用最廣的Socket模型,完成埠模型還是按照"回撥函數"的方式進行來實現非同步,其本質是執行緒池,該執行緒池的核心工作就是去呼叫IO操作完成時的回撥函數。另外因為IO操作畢竟是慢速的操作,所以幾個執行緒就已經足可以應付成千上萬的輸入輸出完成操作的請求(前提就是你的回撥函數做的工作要足夠少),所以這個模型的效能是非常高的。也是現在Windows平臺上效能最好的輸入輸出模型。自定義構造了記憶體池,將一大塊記憶體切分成一定資料量的連續小記憶體,分別分配給不同的SocketAsyncEventArgs物件以提高服務效能,非常巴適;目前看到的FastSocket,SuperSocket,TouchSocket,NewLife等網路框架均採用這種模式,最主要的原因是應用範圍廣。
框架 | 版本 |
---|---|
.NET | Core 1.0, Core 1.1, Core 2.0, Core 2.1, Core 2.2, Core 3.0, Core 3.1, 5, 6, 7 Preview 7 |
.NET Framework | 2.0, 3.0, 3.5, 4.0, 4.5, 4.5.1, 4.5.2, 4.6, 4.6.1, 4.6.2, 4.7, 4.7.1, 4.7.2, 4.8 |
.NET Standard | 1.3, 1.4, 1.6, 2.0, 2.1 |
UWP | 10.0 |
Xamarin.iOS | 10.8 |
Xamarin.Mac | 3.0 |
public class MyIOCPSocket
{
private static int m_numConnections;//最大連線數
private static int m_receiveBufferSize;//接收快取區數量
private static int m_sendBufferSize;//傳送快取區大小
private static byte[] m_receivebuffer;//接收快取區
private static Stack<int> m_freeReceiveIndexPool;//可用的接收快取索引棧
private static int m_currentReceiveIndex;//當前的接收快取區索引
private static byte[] m_sendbuffer;//傳送快取區
private static Stack<int> m_freeSendIndexPool;//可用的傳送快取索引棧
private static int m_currentSendIndex;//當前的傳送快取區索引
private static Stack<SocketAsyncEventArgs> m_ReadPool;//接收SocketAsyncEventArgs池
private static Stack<SocketAsyncEventArgs> m_WritePool;//傳送SocketAsyncEventArgs池
private static Semaphore m_maxNumberAcceptedClients;//最大連線鎖
private static int m_numConnectedSockets;//連線的Socket數量
private static int m_totalBytesRead;//總的接收位元組數
private static Socket listenSocket;//監聽Socket
public static void Run(string m_ip, int m_port, int numConnections, int m_receiveBuffer, int m_sentBuffer)
{
//初始化
m_numConnections = numConnections;
m_receiveBufferSize = m_receiveBuffer;
m_sendBufferSize = m_sentBuffer;
m_receivebuffer = new byte[m_receiveBufferSize * m_numConnections];
m_freeReceiveIndexPool = new Stack<int>();
m_currentReceiveIndex = 0;
m_sendbuffer = new byte[m_sendBufferSize * m_numConnections];
m_freeSendIndexPool = new Stack<int>();
m_currentSendIndex = 0;
m_ReadPool = new Stack<SocketAsyncEventArgs>(m_numConnections);
m_WritePool = new Stack<SocketAsyncEventArgs>(m_numConnections);
m_maxNumberAcceptedClients = new Semaphore(m_numConnections, m_numConnections);
m_numConnectedSockets = 0;
m_totalBytesRead = 0;
//接收快取分配
for (int i = 0; i < m_numConnections; i++)
{
var readEventArg = new SocketAsyncEventArgs();
readEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(ReadWriteIOComleted);
readEventArg.UserToken = new AsyncUserToken();
if (m_freeReceiveIndexPool.Count > 0)
{
readEventArg.SetBuffer(m_receivebuffer, m_freeReceiveIndexPool.Pop(), m_receiveBufferSize);
}
else
{
if ((m_receiveBufferSize * m_numConnections - m_receiveBufferSize) < m_currentReceiveIndex)
{
new ArgumentException("接收快取設定異常");
}
readEventArg.SetBuffer(m_receivebuffer, m_currentReceiveIndex, m_receiveBufferSize);
m_currentReceiveIndex += m_receiveBufferSize;
}
m_ReadPool.Push(readEventArg);
//傳送快取分配
var writeEventArg = new SocketAsyncEventArgs();
writeEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(ReadWriteIOComleted);
writeEventArg.UserToken = new AsyncUserToken();
if (m_freeSendIndexPool.Count > 0)
{
writeEventArg.SetBuffer(m_sendbuffer, m_freeSendIndexPool.Pop(), m_sendBufferSize);
}
else
{
if ((m_sendBufferSize * m_numConnections - m_sendBufferSize) < m_currentSendIndex)
{
new ArgumentException("傳送快取設定異常");
}
writeEventArg.SetBuffer(m_sendbuffer, m_currentSendIndex, m_sendBufferSize);
m_currentSendIndex += m_sendBufferSize;
}
m_WritePool.Push(writeEventArg);
}
//設定監聽socket
listenSocket = new Socket(new IPEndPoint(IPAddress.Parse(m_ip), m_port).AddressFamily, SocketType.Stream, ProtocolType.Tcp);
//繫結埠
listenSocket.Bind(new IPEndPoint(IPAddress.Parse(m_ip), m_port));
listenSocket.Listen(100);
StartAccept(null);
Console.WriteLine("Press any key to terminate the server process....");
Console.ReadKey();
}
public static void ReadWriteIOComleted(object sender, SocketAsyncEventArgs e)
{
switch (e.LastOperation)
{
case SocketAsyncOperation.Receive:
ProcessReceive(e);
break;
case SocketAsyncOperation.Send:
ProcessSend(e);
break;
default:
throw new ArgumentException("The last operation completed on the socket was not a receive or send");
}
}
//傳送訊息回撥
public static void ProcessSend(SocketAsyncEventArgs e)
{
if (e.SocketError == SocketError.Success)
{
AsyncUserToken token = (AsyncUserToken)e.UserToken;
bool willRaiseEvent = token.Socket.ReceiveAsync(token.readEventArgs);
if (!willRaiseEvent)
{
ProcessReceive(token.readEventArgs);
}
}
else
{
CloseClientSocket(e);
}
}
//關閉使用者端
public static void CloseClientSocket(SocketAsyncEventArgs e)
{
AsyncUserToken token = e.UserToken as AsyncUserToken;
try
{
token.Socket.Shutdown(SocketShutdown.Send);
}
catch (Exception exception)
{
Console.WriteLine(exception);
}
token.Socket.Close();
Interlocked.Decrement(ref m_numConnectedSockets);
//將資源返回池中進行復用
m_ReadPool.Push(token.readEventArgs);
m_WritePool.Push(token.writeEventArgs);
token.Socket = null;
token.readEventArgs = null;
token.writeEventArgs = null;
m_maxNumberAcceptedClients.Release();
}
public static void ProcessReceive(SocketAsyncEventArgs e)
{
AsyncUserToken token = (AsyncUserToken)e.UserToken;
//接收到訊息
if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
{
Interlocked.Add(ref m_totalBytesRead, e.BytesTransferred);
byte[] data = new byte[e.BytesTransferred];
Array.Copy(e.Buffer, e.Offset, data, 0, e.BytesTransferred);
var recestr = Encoding.UTF8.GetString(data);
var Replaystr =
$"Server收到訊息:{recestr};Server收到訊息的時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff")}";
Console.WriteLine(Replaystr);
var strbytes = Encoding.UTF8.GetBytes(Replaystr);
Array.Copy(strbytes, 0, token.writeEventArgs.Buffer, token.writeEventArgs.Offset,
strbytes.Length);
//完成埠模型處理傳送
bool willRaiseEvent = token.Socket.SendAsync(token.writeEventArgs);
if (!willRaiseEvent)
{
ProcessSend(token.writeEventArgs);
}
}
else
{
CloseClientSocket(e);
}
}
//如果非同步完成,有執行緒池中執行緒執行
public static void ProcessAccept(SocketAsyncEventArgs e)
{
Interlocked.Increment(ref m_numConnectedSockets);
//從池中獲取資料
SocketAsyncEventArgs readEventArgs = m_ReadPool.Pop();
SocketAsyncEventArgs writeEventArgs = m_WritePool.Pop();
((AsyncUserToken)readEventArgs.UserToken).Socket = e.AcceptSocket;
((AsyncUserToken)readEventArgs.UserToken).readEventArgs = readEventArgs;
((AsyncUserToken)readEventArgs.UserToken).writeEventArgs = writeEventArgs;
((AsyncUserToken)writeEventArgs.UserToken).Socket = e.AcceptSocket;
((AsyncUserToken)writeEventArgs.UserToken).readEventArgs = readEventArgs;
((AsyncUserToken)writeEventArgs.UserToken).writeEventArgs = writeEventArgs;
//使用完成埠模型接收資料
bool willRaiseEvent = e.AcceptSocket.ReceiveAsync(readEventArgs);
if (!willRaiseEvent)
{
ProcessReceive(readEventArgs);
}
StartAccept(e);
}
public static void StartAccept(SocketAsyncEventArgs listenEventArg)
{
if (listenEventArg == null)
{
//完成埠模型需要藉助SocketAsyncEventArgs
listenEventArg = new SocketAsyncEventArgs();
//設定完成埠的回撥
listenEventArg.Completed += new EventHandler<SocketAsyncEventArgs>((sender, e) => ProcessAccept(e));
}
else
{
listenEventArg.AcceptSocket = null;
}
m_maxNumberAcceptedClients.WaitOne();
bool willRaiseEvent = listenSocket.AcceptAsync(listenEventArg);
//如果同步完成返回False,非同步完成返回True,觸發Completed事件
if (!willRaiseEvent)
{
ProcessAccept(listenEventArg);
}
}
}
class AsyncUserToken
{
/// <summary>
/// 通訊SOKET
/// </summary>
public Socket Socket { get; set; }
/// <summary>
/// 讀SocketAsyncEventArgs
/// </summary>
public SocketAsyncEventArgs readEventArgs { set; get; }
/// <summary>
/// 寫SocketAsyncEventArgs
/// </summary>
public SocketAsyncEventArgs writeEventArgs { set; get; }
}
相對於前幾個模型,基於任務的網路模型是比較新的模型,但是效能是最好的,最主要的原因是微軟提供了System.Net.Sockets.SocketTaskExtensions封裝TAP的非同步方法;System.IO.Pipelines管道模型,在 .NET 中執行高效能 I/O 更加容易。該管道可以實現流量控制和反壓。PipeScheduler可以進行回撥執行緒控制。PipeReader和PipeWriter封裝了對記憶體資料的直接操作,實現零拷貝得以大大提供業務流的效能。可惜的是應用範圍比較小,目前框架只支援2.1, 2.2, 3.0, 3.1, 5, 6, 7 Preview 7
,Framework不支援。
private static Pipe pipe;
public static Socket Run(string m_ip, int m_port)
{
//監聽Socket
var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
var ip = IPAddress.Parse(m_ip);
var endpoint = new IPEndPoint(ip, m_port);
socket.Bind(endpoint); //繫結埠和IP
socket.Listen(200); //允許同時監聽的佇列
socket.ReceiveTimeout = -1;
Task.Run(async () =>
{
while (true)
{
var acceptSocket = await socket.AcceptAsync(); //TAP非同步接收
if (acceptSocket != null && acceptSocket.Connected)
{
pipe = new Pipe();
var writer = pipe.Writer;
var reader = pipe.Reader;
var writetaskr = Task.Run(async () =>
{
while (true)
{
var memory = writer.GetMemory(1024);
try
{
//TAP 非同步讀取資料
int bytesRead = await acceptSocket.ReceiveAsync(memory, SocketFlags.None);
if (bytesRead == 0)
{
break;
}
//告訴 PipeWriter 寫入多少資料。
writer.Advance(bytesRead);
}
catch (Exception e)
{
break;
}
//重新整理寫入
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// 完成寫入
await writer.CompleteAsync();
}).ContinueWith((t) =>
{
acceptSocket?.Shutdown(SocketShutdown.Both);
//acceptSocket?.Disconnect(true);
acceptSocket?.Dispose();
acceptSocket = null;
});
var readingtask= Task.Run(async() =>
{
while (true)
{
try
{
//從管道中讀取
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))//解析
{
ProcessLine(acceptSocket, line);
}
//實際讀了多少
reader.AdvanceTo(buffer.Start, buffer.End);
//是否寫已經結束
if (result.IsCompleted)
{
break;
}
}
catch (Exception e)
{
break;
}
}
await reader.CompleteAsync();
}).ContinueWith((t) =>
{
acceptSocket?.Shutdown(SocketShutdown.Both);
acceptSocket?.Dispose();
acceptSocket = null;
}
);
}
}
});
return socket;
}
主要講述在通訊端程式設計中,如何實現非同步和複用以提高效能。講述了非同步程式設計(APM)、基於事件的非同步模型(EAP)和基於任務的非同步模型(TAP);複用方面從記憶體池(ArrayPool),到自定義構建記憶體池(利用快取記憶體)和完成埠池,再到最新的管道模型,實現零拷貝。
如果覺得還不錯就關注一下吧!
我的公眾號:
您的資助是我最大的動力!
金額隨意,歡迎來賞!