c# 非同步進階————channel [一]

2022-08-30 12:00:47

前言

該系列為非同步程式設計的進階篇,其實也不能這麼講。世界上本沒有進階篇,只能說是高階篇(高階篇不能說多高階,是對底層的封裝的意思),只要是加深理解都是進階。

本章先介紹一下channel。

正文

下面沒什麼好說的,把檔案貼一下。

https://docs.microsoft.com/zh-cn/dotnet/api/system.threading.channels?view=net-6.0

Channels 是做什麼的呢?

提供用於在生成者和使用者之間以非同步方式傳遞資料的一組同步資料結構。

這裡面有同步又有非同步,到底該怎麼理解呢?

首先這裡面有生成者和使用者兩個概念。

那麼怎麼理解同步資料呢? 就是說使用者是按照生成者的生成資料順序進行使用的。

然後這個非同步方式傳遞資料是怎麼回事呢? 那就是說比如生成者生產了一條訊息,然後使用者使用了,然後生產者才能繼續生成,那麼就是同步,反之就是非同步。

為什麼用同步來舉例,然後反之就是非同步呢? 因為非同步的情況太多了。

那麼這裡就解釋完了。

然後這裡要說明一點的就是有些初學者難理解這裡說的channel 是一種資料結構,裡面不是有方法嗎?

先說下資料結構的含義:

A data structure is a storage that is used to store and organize data. It is a way of arranging data on a computer so that it can be accessed and updated efficiently.

資料結構是一種用來儲存和組織資料的記憶體。然後一種編排資料的方法用來存取和更新資料。

所以資料結構不僅僅是用來儲存的,裡面還有組織資料的能力。

比如我們的陣列、集合啊,都是資料結構。 其實就兩個特徵,一個是儲存,另外一個是組織。

然後裡面有上面這些類哈。

來看下第一個類:

BoundedChannelOptions 繼承自channelOptions。

那麼還是先看channelOptions。

這裡面除了AllowSynchronousContinuations,其他含義很清楚了,就不看了。這個AllowSynchronousContinuations 又很繞,等下實踐的時候再看下。

BoundedChannelOptions 從表面意思是有邊界的設定, 比 channeloptions 多了兩個東西。

裡面就是限制管道中的最大訊息數,另一個就是如果到達訊息數後,怎麼處理的問題。

處理方式有很多種:

然後思考一個問題,那就是為什麼要限制裡面的訊息數。

這裡面和快取一個道理,比如說不斷的往裡面加入訊息數,且消費者無法消費完,然後記憶體就會不斷的升高,因為我們的記憶體有限,不然做到無限快取。

而且還有一個問題,那就是如果消費不完,然後不斷往裡面增加資料還會增加io成本,所以說應該考慮自己的消費情況,來設定最大訊息數。

channel 提供了有邊界選項,同樣也提供了無邊界的UnboundedChannelOptions 。

這裡有人就會問了, 為啥要無邊界的,上面不是說要設定有邊界的,無邊界不是會有問題嗎?

是的,無邊界的確會有問題。

這個無邊界的選擇意思是讓你自己控制生產速度。

舉個例子,比如說你需要進行對流量控制,每條訊息的大小都不一樣,那麼這個時候你對訊息數進行限制,就是達不到效果的。

這裡要說明的無邊界不是真的讓你放飛自我。如果你肯定消費者一定能達到預期的消費,那麼你也可以不限制,但是最好不要這麼做。

因為channel 畢竟是單臺機器的使用, 不像kafka這種叢集模式的,具備很大的儲存能力。僅個人建議,如有不同想法可以交流一下。

channel 這個類,就是一個構造器,建立有邊界的channel 和無邊界的。

建立出來的channel 有 讀取器和寫入器:

至於例子,在網上找了一個例子:

using System.Threading.Channels;

var channel = Channel.CreateUnbounded<int>();

Task.Run(async () =>
{
    for (int i = 0; i < 10; i++)
    {    
        await Task.Delay(TimeSpan.FromMilliseconds(200));
        await channel.Writer.WriteAsync(i);// 生產者寫入訊息
        if (i > 5)
        {
            channel.Writer.Complete(); //生產者也可以明確告知消費者不會傳送任何訊息了
        }
    }

});

Task.Run(async () =>
{
    await foreach (var item in channel.Reader.ReadAllAsync())//async stream,在沒有被生產者明確Complete的情況下,這裡會一致阻塞下去
    {
        Console.WriteLine(item);
    }
    Console.WriteLine("done");
});

Console.ReadKey();

這個例子比較簡單,後面會編寫一個kafka 批次消費的庫,裡面用到了這個channel,到時候可以交流一下。

該系列不斷更新,主要是介紹一下一些非同步程式設計高階部分(不是指內容多高階而是指上層應用),內容偏設計方面,實戰例子會在後面的開源中體現。