輕量通訊協定 --- MQTT

2023-10-11 06:01:13

介紹

一、MQTT簡介

MQTT(Message Queuing Telemetry Transport) 是一種輕量級的訊息傳輸協定,通常用於在物聯網(IoT)和感測器網路中進行通訊。它設計用於在低頻寬、不穩定或高延遲的網路環境下傳輸資料,因此非常適用於連線裝置之間的通訊,尤其是在資源有限的環境中。

MQTT 的主要特點包括以下幾點:

  1. 輕量級:MQTT 協定本身非常簡潔,訊息頭部佔用較少的頻寬,使其在低頻寬網路中執行效率高。

  2. 釋出/訂閱模型:MQTT 使用釋出/訂閱模型,其中使用者端可以訂閱特定的主題(Topic),並接收與該主題相關的訊息。釋出者釋出訊息到特定主題,然後所有訂閱了該主題的使用者端都將收到該訊息。

  3. 可靠性:MQTT 支援三種不同級別的訊息傳輸質量,包括最多一次、至少一次和僅一次傳輸,可根據應用需求選擇合適的級別。

  4. 持久對談:MQTT 允許使用者端建立持久對談,以便在連線丟失後重新連線時能夠恢復之前的訂閱和訊息傳遞狀態。

  5. QoS(Quality of Service):MQTT 提供不同的 QoS 級別,以確保訊息的可靠傳遞。這包括 QoS 0(最多一次傳輸)、QoS 1(至少一次傳輸)和 QoS 2(僅一次傳輸)。

  6. 適應性:MQTT 可以在多種網路協定上執行,包括 TCP/IP、WebSocket 和其他協定。

總之,MQTT 是一種非常適合物聯網和感測器網路的通訊協定,因其輕量級和高效的特性而受到廣泛應用。它允許裝置之間實時地交換資訊,從而支援各種應用,包括智慧家居、工業自動化、農業監測等。

二、MQTT 的 QoS 機制

什麼是 QoS 機制?(https://www.emqx.com/zh/blog/introduction-to-mqtt-qos)

很多時候,使用 MQTT 協定的裝置都執行在網路受限的環境下,而只依靠底層的 TCP 傳輸協定,並不能完全保證訊息的可靠到達。因此,MQTT 提供了 QoS 機制,其核心是設計了多種訊息互動機制來提供不同的服務質量,來滿足使用者在各種場景下對訊息可靠性的要求。

MQTT 定義了三個 QoS 等級,分別為:

  • QoS 0,最多交付一次。
  • QoS 1,至少交付一次。
  • QoS 2,只交付一次。

其中,使用 QoS 0 可能丟失訊息,使用 QoS 1 可以保證收到訊息,但訊息可能重複,使用 QoS 2 可以保證訊息既不丟失也不重複。QoS 等級從低到高,不僅意味著訊息可靠性的提升,也意味著傳輸複雜程度的提升。

MQTT 的.Net 庫 --- MQTTnet

MQTTnet是一個開源的用於基於MQTT的通訊的高效能.NET庫。它提供了一個MQTT使用者端和一個MQTT伺服器(代理),並支援MQTT協定,直到版本5。它與大多數受支援的.NET相容框架版本和CPU體系結構。

Guthub地址: https://github.com/dotnet/MQTTnet

MQTTnet通過NuGet軟體包管理器交付。可以在這裡找到軟體包:https://www.nuget.org/packages/MQTTnet/

在Visual Studio中,在Package Manager控制檯中使用以下命令手動安裝MQTTnet:

Install-Package MQTTnet

可以通過GitHub上直接檢視Demo原始碼,或者下載原始碼後使用Visual Studio開啟,它提供了多個Samples ,每個Samples下有不同的相關方法,有以下幾類:

  • Client_Connection_Samples ---
  • Client_Publish_Samples
  • Client_Subscribe_Samples
  • Logger_Samples
  • Managed_Client_Simple_Samples
  • PackageInspection_Samples
  • RpcClient_Samples
  • Server_ASP_NET_Samples
  • Server_Diagnostics_Samples
  • Server_Intercepting_Samples
  • Server_Retained_Messages_Samples
  • Server_Simple_Samples
  • Server_TLS_Samples

可以下載原始碼編譯,執行起來後如下:

Windows下MQTT訊息伺服器的安裝使用

一般,常見的MQTT伺服器軟體有:

  • Mosquitto - 流行的開源MQTT伺服器,但是沒有視覺化介面,需要藉助其他工具才可以視覺化。

  • EMQX - 強大的開源MQTT伺服器,有視覺化介面。

  • HiveMQ - HiveMQ 是一個商業的MQTT伺服器,提供免費的開發者版。

這裡推薦使用EMQX ,它提供了視覺化介面,以便更容易地設定、管理和監控MQTT伺服器。

一、下載EMQX

EMQX 官網提供了豐富的檔案,Quick Start 地址:https://www.emqx.io/docs/zh/v5.2/

這裡不建議安裝最新版本,建議降低版本,若安裝最新版本 emqx-5.3.0-windows-amdx64,則會啟動異常,如下所示:


本次測試使用 emqx-4.4.19-otp24.3.4.6-windows-amd64 版本,如下:

按照官網教學,進入到安裝目錄/emqx/bin 下,使用以下指令啟動EMQX :

emqx start

二、啟動EMQX 服務

EMQX 常用啟動命令:

命令 描述
start 以守護行程模式啟動 EMQX,執行期間不需要互動式 shell。
console 在 Erlang 或 Elixir 互動式 shell 中啟動 EMQX。用於在開發環境中偵錯 EMQX,需要與 EMQX 進行互動。
foreground 在前臺模式下啟動 EMQX,不使用互動式 shell。用於在開發環境中啟動 EMQX,但不需要後臺執行。
stop 停止執行中的 EMQX 節點。
ctl 管理和監控 EMQX,執行 emqx ctl help 可以獲取更多詳細資訊。

EMQX 常用ctl命令:

命令 描述
status 快速檢視當前執行的節點是否執行。
broker 檢視當前節點的執行的版本狀態以及執行時長。
observer 可以用於檢視執行時狀態。展示一個類似於 linux 的 top 命令的介面。
admins 用於建立、修改、刪除管理員賬戶。
clients 檢視和管理使用者端。
topics 檢視當前系統中所有訂閱的主題。
subscriptions 檢視、增加或者刪除某個使用者端的訂閱。

三、EMQX Dashboard

EMQX Dashboard 是EMQX內建的Web 應用程式,它支援檢視執行中的 EMQX 叢集的整體連線數,訂閱主題數,訊息收發數量和流入流出速率,還包括節點列表和節點資訊和一些系統指標資訊,同時也可以對一些使用者端連線與訂閱資料進行檢視與管理

如果 EMQ 安裝在本機,則使用瀏覽器開啟地址 http://127.0.0.1:18083 ,輸入預設使用者名稱 admin 與預設密碼 public ,登入進入 Dashboard,如下圖:

如果忘記了 Dashboard 登入密碼,可以通過 cli 的 admins 命令進行重置,詳情請參考 命令列 - admins

./bin/emqx ctl admins passwd <Username> <Password>

四、MQTTX Desktop

MQTTX 使用者端是一款跨平臺的 MQTT 桌面使用者端工具。它提供使用者友好的圖形介面,讓使用者可以快速建立、測試 MQTT 連線,並進行MQTT 訊息的釋出和訂閱。下載地址:https://mqttx.app/zh/downloads 介面如下圖:

使用者端程式碼編寫

一、準備工作

接下來 我們使用MQTTnet,編寫伺服器端和使用者端測試一下:

  1. 新建控制檯專案,新增MQTTnet庫。

  2. 按照上文中命令啟動EMQX服務

  3. 使用MQTTX Desktop,設定 hostlocalhostprot1883 ,連線服務,如下圖:

二、程式碼編寫

這樣準備工作就做好了,編寫建立釋出使用者端程式碼,如下:

public static async Task CreatePublishMQTTClient()
{
    try
    {
        MqttFactory mqttFactory = new MqttFactory();

        var mqttClient = mqttFactory.CreateMqttClient();

        var mqttClientOptions = new MqttClientOptionsBuilder()
            .WithTcpServer("localhost", 1883)
            .WithClientId("Client1")
            .Build();
        var connectResult = await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);

        Console.WriteLine("mqttClient connectResult: " + connectResult.ResultCode.ToString());

        while (true)
        {
            var msg = Console.ReadLine();

            string topic = "testtopic/publish";
            string payload = $"{msg} {DateTime.Now:yyyy-MM-dd HH:mm:ss:fff}"; // 訊息內容

            var message = new MqttApplicationMessageBuilder()
                .WithTopic(topic)
                .WithPayload(payload)
                .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce) // 設定訊息質量
                .WithRetainFlag(false) // 是否保留訊息
                .Build();

            await mqttClient.PublishAsync(message, CancellationToken.None);
        }

    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.ToString());
    }
}

接下來再編寫一個訂閱使用者端程式碼:

public static async Task CreateSubscribeMQTTClient()
{
    try
    {
        MqttFactory mqttFactory = new MqttFactory();

        var mqttClient = mqttFactory.CreateMqttClient();

        var mqttClientOptions = new MqttClientOptionsBuilder()
            .WithTcpServer("localhost", 1883)
            .WithClientId("Client1")
            .Build();


        mqttClient.ApplicationMessageReceivedAsync += (e) =>
        {
            Task task = Task.Factory.StartNew(() =>
            {
                var msgArray = e.ApplicationMessage.Payload;
                string result = Encoding.UTF8.GetString(msgArray);
                Console.WriteLine("Received: " + result);
            });

            return task;
        };

        var connectResult = await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);

        Console.WriteLine("mqttClient connectResult: " + connectResult.ResultCode.ToString());

        string topic = "testtopic/subscribe";

        var subscribeOptions = new MqttClientSubscribeOptionsBuilder()
            .WithTopicFilter(topic)
            .Build();

        await mqttClient.SubscribeAsync(subscribeOptions);

    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.ToString());
    }
}

接下來編寫控制檯Main方法,由於MQTT Client方法是非同步的,所以為了避免控制檯退出,在呼叫方法後,增加了一個While 死迴圈保證控制檯程式是啟用狀態,程式碼如下:

static void Main(string[] args)
{
    Console.WriteLine("Choose a creation type: \r\n 1: PublishClient\r\n 2: SubscribeClient");
    var type = Console.ReadLine();
    switch (type)
    {
        case "1":
            _ = CreatePublishMQTTClient();
            break;
        case "2":
            _ = CreateSubscribeMQTTClient();
            break;

    }
    while (true) Thread.Sleep(1000);
}

三、測試

先測試釋出使用者端,在控制檯選擇PublishClient,然後等待連線,可以看到連線結果為Success,傳送兩條測試訊息,可以看到MQTTX Desktop 均收到。

接下來測試訂閱使用者端,在控制檯選擇SubscribeClient,然後等待連線,可以看到連線結果為Success,在MQTTX Desktop 釋出一條訊息給訂閱使用者端,可以看到控制檯程式中,接收到了測試訊息。

總結

總的來說, 使用C#編寫 MQTT相關程式碼的資料還是比較少的,但好在官方檔案足夠詳細,今天試玩一下還是花費不少功夫的。本篇文章作拋磚引玉,淺淺瞭解MQTT這個輕量級的通訊協定,在輔以Demo加深理解,熟悉如何使用,文章末尾也提供諸多參考文章,方便大家借鑑學習。

參考連結

MQTTnet Guthub地址: https://github.com/dotnet/MQTTnet

MQTT 入門指南:https://www.emqx.com/zh/mqtt-guide

EMQX 官方檔案:https://www.emqx.io/docs/zh/v5.2/

EMQX 命令列檔案:https://www.emqx.io/docs/zh/v5.2/admin/cli.html

EMQX 設定手冊:https://www.emqx.io/docs/zh/v5.2/configuration/configuration-manual.html

EMQX基礎功能: https://juejin.cn/post/7081629128650129416

MQTTX 使用者端下載:https://mqttx.app/zh/downloads