【.NET+MQTT】.NET6 環境下實現MQTT通訊,以及伺服器端、使用者端的雙邊訊息訂閱與釋出的程式碼演示

2022-07-04 06:01:32

前言: MQTT廣泛應用於工業物聯網、智慧家居、各類智慧製造或各類自動化場景等。MQTT是一個基於使用者端-伺服器的訊息釋出/訂閱傳輸協定,在很多受限的環境下,比如說機器與機器通訊、機器與物聯網通訊等。好了,科普的廢話不多說,下面直接通過.NET環境來實現一套MQTT通訊demo,實現伺服器端與使用者端的雙邊訊息釋出與訂閱的功能和演示。

 

開發環境:

VS2022 + .NET 6 + Webapi / 控制檯

 

1、新建一個webapi專案,用來後面做測試使用

 

2、新建一個繼承自IHostedService的服務,用於隨著webapi程式的啟動而自動執行。(最終程式碼在文末)

 

3、引入 MQTTNet 包,該專案提供了.net環境下的MQTT通訊協定支援,這款框架很優秀,此處直接參照它來進行使用。

 

4、在上面的MqttHostService類裡面,開始方法裡面新增初始化MQTT伺服器端的一些功能,例如 IP、埠號、事件等等。

 

5、mqtt伺服器端支援的一系列功能很多,大佬們可以自行去嘗試一些新發現,此處只使用若干個簡單功能。

 

6、新增使用者端連線事件、連線關閉事件

 

7、由於事件要用的可能有點多,此處就不一一例舉了,可以直接看以下的程式碼,以及有關注釋來理解。

 

8、事件觸發時候,列印輸出

 

9、輸出之前,記錄一個當前事件名稱標記一下,用於可以更加清楚看出是哪個事件輸出的。

 

10、對MqttHostService類進行註冊,用於程式啟動時候跟隨啟動。

 

11、上面貌似設計的不是很友好,所以把mqtt服務範例單獨弄出來,寫入到單獨的類裡面做成屬性,供方便呼叫。

 

12、把先前的一些東西改一下,換成使用上面步驟的屬性來直接呼叫使用。

 

13、執行一下,看看是否可以成功,顯示服務已啟動,說明服務啟動時OK的了.

 

14、新增一個控制檯程式 MqttClient,用於模擬使用者端。

 

15、建立使用者端啟動以及有關設定資訊和有關事件,如圖。具體使用可以看程式碼註釋,就不過多解釋了。

 

16、在program類裡面,呼叫使用者端啟動方法,用於測試使用。

 

17、上面使用者端對應的三個事件的實現如圖,同時進行有關資訊的列印輸出。

 

18、啟動伺服器端,然後啟動使用者端,可以看到伺服器端有一個連線失敗的訊息,這個是因為上面設定的使用者端使用者名稱是admin,密碼是1234567,而伺服器端設定的規則是,使用者名稱是admin  密碼是123456

 

19、密碼改回正常匹配項以後,再重新執行試試看,可以看到使用者端與伺服器端連線上了。

 

20、如果關閉使用者端,也可以看到伺服器端會進入使用者端關閉事件內。

 

21、把上面主題訂閱的內容寫到連線成功以後的事件裡面,不然使用者端連線期間,可能就執行了主題訂閱,會存在訂閱失敗的情況。改為寫入到連線成功以後的事件裡面,可以保證主題訂閱肯定是在使用者端連線成功以後才執行的。

 

22、接下來測試伺服器端訊息推播,在MqttService服務裡面,新增一個方法,用來執行mqtt伺服器端釋出主題訊息使用。有關設定資訊和訊息格式,如圖所示。

 

23、新增一個API控制器,用來測試使用。API引數直接拿來進行訊息的推播使用。

 

24、執行伺服器端和使用者端,並存取剛剛新增的api介面,手動隨意輸入一條訊息,可以看到使用者端訂閱的主題訊息已經被實時接收到了。

 

25、接下來對使用者端新增一個訊息推播的方法,用來測試使用者端訊息釋出的功能。有關訊息格式和呼叫,如圖所示,以及註釋部分的說明。

 

26、使用者端program類裡面,使用者端連線以後,通過手動回車,來執行使用者端釋出訊息。

 

27、再次啟動伺服器端和使用者端

 

28、然後使用者端內按一下回車,執行訊息釋出功能。可以看到,伺服器端成功接收到了使用者端發過來的主題訊息。

 

29、接下來測試使用者端與使用者端之間的訊息釋出與訂閱,為了模擬多使用者端效果,把上面使用者端已經編譯好的檔案拷貝一份出來。

 

30、然後原生的程式碼進行一些修改,用來當做第二個使用者端程式。所以使用者端id也進行變更為 testclient02

 

31、對使用者端訂閱的主題,也改成 topic_02

 

32、啟動伺服器端,以及拷貝出來的使用者端1,和上面修改了部分程式碼的使用者端2,保證都已經連線上伺服器端。

 

33、呼叫伺服器端的api介面,由於伺服器端釋出的訊息是釋出給topic_01的,所以只有使用者端1可以接收到訊息。

 

34、使用者端1執行回車,用於釋出一段訊息給主題 topic_02,可以看到使用者端01釋出的訊息,同時被伺服器端和使用者端02接收到了。因為伺服器端是總指揮,所以使用者端釋出的訊息都會經過伺服器端,從而伺服器端都可以接收到連線的使用者端釋出的所有訊息。

 

35、測試資料保持,下面先對使用者端1進行斷開,然後再重新連線使用者端1,可以看到使用者端1直接接收到了它訂閱的主題的上一次最新的訊息內容,這個就是訊息裡面,Retain屬性設為True的結果,用於讓伺服器端記憶該主題訊息使用的。如果設為false,就沒有這個效果了,大佬們也可以自己嘗試。

 

36、最終的伺服器端程式碼:

MqttHostService:

  public class MqttHostService : IHostedService, IDisposable
    {
        public void Dispose()
        {
            
        }
        const string ServerClientId = "SERVER";
        public Task StartAsync(CancellationToken cancellationToken)
        {
            MqttServerOptionsBuilder optionsBuilder = new MqttServerOptionsBuilder();
            optionsBuilder.WithDefaultEndpoint();
            optionsBuilder.WithDefaultEndpointPort(10086); // 設定 伺服器端 埠號
            optionsBuilder.WithConnectionBacklog(1000); // 最大連線數
            MqttServerOptions options = optionsBuilder.Build();

            MqttService._mqttServer = new MqttFactory().CreateMqttServer(options);

            MqttService._mqttServer.ClientConnectedAsync += _mqttServer_ClientConnectedAsync; //使用者端連線事件
            MqttService._mqttServer.ClientDisconnectedAsync += _mqttServer_ClientDisconnectedAsync; // 使用者端關閉事件
            MqttService._mqttServer.ApplicationMessageNotConsumedAsync += _mqttServer_ApplicationMessageNotConsumedAsync; // 訊息接收事件

            MqttService._mqttServer.ClientSubscribedTopicAsync += _mqttServer_ClientSubscribedTopicAsync; // 使用者端訂閱主題事件
            MqttService._mqttServer.ClientUnsubscribedTopicAsync += _mqttServer_ClientUnsubscribedTopicAsync; // 使用者端取消訂閱事件
            MqttService._mqttServer.StartedAsync += _mqttServer_StartedAsync; // 啟動後事件
            MqttService._mqttServer.StoppedAsync += _mqttServer_StoppedAsync; // 關閉後事件
            MqttService._mqttServer.InterceptingPublishAsync += _mqttServer_InterceptingPublishAsync; // 訊息接收事件
            MqttService._mqttServer.ValidatingConnectionAsync += _mqttServer_ValidatingConnectionAsync; // 使用者名稱和密碼驗證有關

            MqttService._mqttServer.StartAsync();
            return Task.CompletedTask;
        }

        /// <summary>
        /// 使用者端訂閱主題事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg)
        {
            Console.WriteLine($"ClientSubscribedTopicAsync:使用者端ID=【{arg.ClientId}】訂閱的主題=【{arg.TopicFilter}】 ");
            return Task.CompletedTask;
        }

        /// <summary>
        /// 關閉後事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_StoppedAsync(EventArgs arg)
        {
            Console.WriteLine($"StoppedAsync:MQTT服務已關閉……");
            return Task.CompletedTask;
        }

        /// <summary>
        /// 使用者名稱和密碼驗證有關
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
        {
            arg.ReasonCode = MqttConnectReasonCode.Success;
            if ((arg.Username ?? string.Empty)!="admin" || (arg.Password??String.Empty)!="123456")
            {
                arg.ReasonCode = MqttConnectReasonCode.Banned;
                Console.WriteLine($"ValidatingConnectionAsync:使用者端ID=【{arg.ClientId}】使用者名稱或密碼驗證錯誤 ");

            }
            return Task.CompletedTask;
        }

        /// <summary>
        /// 訊息接收事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
        {
            if (string.Equals(arg.ClientId, ServerClientId))
            {
                return Task.CompletedTask;
            }

            Console.WriteLine($"InterceptingPublishAsync:使用者端ID=【{arg.ClientId}】 Topic主題=【{arg.ApplicationMessage.Topic}】 訊息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等級=【{arg.ApplicationMessage.QualityOfServiceLevel}】");
            return Task.CompletedTask;

        }

        /// <summary>
        /// 啟動後事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_StartedAsync(EventArgs arg)
        {
            Console.WriteLine($"StartedAsync:MQTT服務已啟動……");
           return Task.CompletedTask;
        }

        /// <summary>
        /// 使用者端取消訂閱事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg)
        {
            Console.WriteLine($"ClientUnsubscribedTopicAsync:使用者端ID=【{arg.ClientId}】已取消訂閱的主題=【{arg.TopicFilter}】  ");
            return Task.CompletedTask;
        }

        private Task _mqttServer_ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg)
        {
            Console.WriteLine($"ApplicationMessageNotConsumedAsync:傳送端ID=【{arg.SenderId}】 Topic主題=【{arg.ApplicationMessage.Topic}】 訊息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等級=【{arg.ApplicationMessage.QualityOfServiceLevel}】");
            return Task.CompletedTask;

        }

        /// <summary>
        /// 使用者端斷開時候觸發
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        /// <exception cref="NotImplementedException"></exception>
        private Task _mqttServer_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg)
        {
            Console.WriteLine($"ClientDisconnectedAsync:使用者端ID=【{arg.ClientId}】已斷開, 地址=【{arg.Endpoint}】  ");
            return Task.CompletedTask;

        }

        /// <summary>
        /// 使用者端連線時候觸發
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_ClientConnectedAsync(ClientConnectedEventArgs arg)
        {
            Console.WriteLine($"ClientConnectedAsync:使用者端ID=【{arg.ClientId}】已連線, 使用者名稱=【{arg.UserName}】地址=【{arg.Endpoint}】  ");
            return Task.CompletedTask;
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
           return Task.CompletedTask;
        }
    }

 

MqttService:

 public class MqttService
    {
        public static MqttServer _mqttServer { get; set; }

        public static void PublishData(string data)
        {
            var message = new MqttApplicationMessage
            {
                Topic = "topic_01",
                Payload = Encoding.Default.GetBytes(data),
                QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
                Retain = true  // 伺服器端是否保留訊息。true為保留,如果有新的訂閱者連線,就會立馬收到該訊息。
            };

            _mqttServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(message) // 傳送訊息給有訂閱 topic_01的使用者端
            {
                SenderClientId = "Server_01"
            }).GetAwaiter().GetResult();
        }

    }

 

37、最終的使用者端程式碼:

MqttClientService:

public class MqttClientService
    {
        public static IMqttClient _mqttClient;
        public void MqttClientStart()
        {
            var optionsBuilder = new MqttClientOptionsBuilder()
                .WithTcpServer("127.0.0.1", 10086) // 要存取的mqtt伺服器端的 ip 和 埠號
                .WithCredentials("admin", "123456") // 要存取的mqtt伺服器端的使用者名稱和密碼
                .WithClientId("testclient02") // 設定使用者端id
                .WithCleanSession()
                .WithTls(new MqttClientOptionsBuilderTlsParameters
                {
                    UseTls = false  // 是否使用 tls加密
                });

            var clientOptions = optionsBuilder.Build();
            _mqttClient = new MqttFactory().CreateMqttClient();

            _mqttClient.ConnectedAsync += _mqttClient_ConnectedAsync; // 使用者端連線成功事件
            _mqttClient.DisconnectedAsync += _mqttClient_DisconnectedAsync; // 使用者端連線關閉事件
            _mqttClient.ApplicationMessageReceivedAsync += _mqttClient_ApplicationMessageReceivedAsync; // 收到訊息事件

            _mqttClient.ConnectAsync(clientOptions);


        }

        /// <summary>
        /// 使用者端連線關閉事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
        {
            Console.WriteLine($"使用者端已斷開與伺服器端的連線……");
            return Task.CompletedTask;
        }

        /// <summary>
        /// 使用者端連線成功事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
        {
            Console.WriteLine($"使用者端已連線伺服器端……");

            // 訂閱訊息主題
            // MqttQualityOfServiceLevel: (QoS):  0 最多一次,接收者不確認收到訊息,並且訊息不被傳送者儲存和重新傳送提供與底層 TCP 協定相同的保證。
            // 1: 保證一條訊息至少有一次會傳遞給接收方。傳送方儲存訊息,直到它從接收方收到確認收到訊息的封包。一條訊息可以多次傳送或傳遞。
            // 2: 保證每條訊息僅由預期的收件人接收一次。級別2是最安全和最慢的服務質量級別,保證由傳送方和接收方之間的至少兩個請求/響應(四次握手)。
            _mqttClient.SubscribeAsync("topic_02", MqttQualityOfServiceLevel.AtLeastOnce);

            return Task.CompletedTask;
        }

        /// <summary>
        /// 收到訊息事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
        {
            Console.WriteLine($"ApplicationMessageReceivedAsync:使用者端ID=【{arg.ClientId}】接收到訊息。 Topic主題=【{arg.ApplicationMessage.Topic}】 訊息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等級=【{arg.ApplicationMessage.QualityOfServiceLevel}】");
            return Task.CompletedTask;
        }

        public void Publish(string data)
        {
            var message = new MqttApplicationMessage
            {
                Topic = "topic_02",
                Payload = Encoding.Default.GetBytes(data),
                QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
                Retain = true  // 伺服器端是否保留訊息。true為保留,如果有新的訂閱者連線,就會立馬收到該訊息。
            };
            _mqttClient.PublishAsync(message);
        }
    }

 

38、後記:MQTT以上演示已經完畢,可以看到它的一些特性,跟websocket很接近,但是又比websocket通訊更加靈活。其實,實際上MQTT的使用者端在現實生產環境場景下,並不需要咱們開發者進行開發,很多硬體裝置都支援提供MQTT協定的通訊使用者端,所以只需要自己搭建一個伺服器端,就可以實現實時監控各種裝置推播過來的各種訊號資料。同時使用者端支援釋出訊息給其他使用者端,所以就實現了裝置與裝置之間的一對一訊號通訊的效果了。如果需要下發訊號給硬體裝置,MQTT伺服器端也可以直接下發給某個指定裝置來進行實現即可。上面案例只提供入門方案,如果有感興趣的大佬,可以自己去拓展一下,來達到更好的效果。

 

39、以上就是該篇文章的所有內容。如果覺得有幫助,歡迎轉發、點贊、推薦和評論留言。大佬們的鼓勵,是我不斷繼續創作部落格的動力之一。如果有興趣一起探索更多.net 技術,歡迎點選下方qq群,加入一起吹牛談人生。或者掃描下面我個人微信名片二維條碼加我好友,我也可以拉你到微信.net交流群。如果沒有找到二維條碼和QQ群連結,可能是你現在進入的文章是爬蟲爬走的文章,可以點選該文章原始地址[部落格園]的連結來跳轉回最初原文:https://www.cnblogs.com/weskynet/p/16441219.html