使用MASA Stack+.Net 從零開始搭建IoT平臺 第五章 使用時序庫儲存上行資料

2023-06-26 15:00:14

@


前言

我們可以將裝置上行資料儲存到關係型資料庫中,我們需要兩張帶有時間戳的表(最新資料表歷史資料表),歷史資料表儲存所有裝置上報的資料,最新資料表需要儲存裝置最新一條上報資料,這條最新資料相當於裝置的當前狀態。然後展示的時候只展示最新一條資料的狀態,報表查詢可以按照裝置id和時間從歷史資料表查詢彙總。
這樣是可以的,但是我們的最新資料表需要被頻繁的更新,資料量少的時候沒問題。但資料量大,並行高的時候就會出現問題。
1、儲存成本:資料不會被壓縮,導致佔用儲存資源。
2、維護成本:單表資料量太大時,需要人工分庫分表。
3、寫入效能:單機寫入吞吐量難以滿足大量上行資料的寫入需求,資料庫存在效能瓶頸。
4、查詢效能:資料量太大導致查詢效能受到影響。

分析

我們可以採用時序庫來解決上述問題,首先來了解一下什麼是時序資料。時序資料是按照時間維度進行索引的資料,它記錄了某個被測量實體在一定時間範圍內,每個時間點上的一組測試值。感測器上傳的室內PM2.5和甲醛資料、淨水器感測器當前的TDS值、計算機系統的監控資料等,都屬於時序資料,時序資料有如下特點:
1、資料量較大,寫入操作是持續且平穩的,而且寫多讀少。
2、只有寫入操作,幾乎沒有更新操作,比如去修改感測器的歷史資料,是沒有意義的。
3、沒有隨機刪除,即使刪除也是按照時間範圍進行刪除。刪除某一個時間點的資料沒有意義,但是刪除2年前的資料是有意義的。
4、資料實時性和時效性強,資料隨著時間的推移不斷追加,舊資料很快失去意義。
5、大部分以時間和實體為維度進行查詢,很少以測試值為維度查詢,比如使用者會查詢某個時間段的溫度資料,但是很少會去查詢溫度高於多少度的資料記錄。
顯然IoT的業務是符合使用時序庫的場景的。
序資料庫就是用來儲存時序資料的資料庫,時序資料庫相較於傳統的關係型資料和非關係型資料庫而言,專門優化了對時序資料的儲存,開源的時序資料庫有InfluxDB OpenTSDB、TimeScaleDB 等。本文以InfluxDB資料庫進行演示。
時序資料庫有如下幾個概念。
1.Metric:度量,相當於關係型資料庫中的表(table)。
2.Data Point:資料點,相當於關係型資料庫的中的行(row)。
3.Timestamp:時間戳,資料點生成時的時間戳。
4.Field:測量值,比如溫度和溼度、PM2.5等。
5.Tag:標籤,用於標識資料點,通常用來標識資料點的來源,比如溫度和溼度資料來自哪個房間,哪個裝置,可以當作關係型資料庫表的主鍵。

如下圖,度量為 Wind,每一個資料點都具有一個 timestamp,兩個 field:direction 和 speed,兩個 tag:sensor、city。它的第一行和第三行,存放的都是 sensor 號碼為 95D8-7913 的裝置,屬性城市是上海。隨著時間的變化,風向和風速都發生了改變,風向從 23.4 變成 23.2;而風速從 3.4 變成了 3.3。

圖片來自網路

實施步驟

時序庫的安裝

安裝參考官方檔案,為了方便,我這裡採用docker安裝

docker run --name influxdb -p 8086:8086 influxdb:2.7.0

https://docs.influxdata.com/influxdb/v2.7/install/

我們開啟 伺服器ip:8086 可以看到它自帶的管理介面,我們首先建立使用者名稱密碼,組織、以及Bucket的名稱。
這裡的bucket "IoTDemos" 相當於資料庫的名稱

我們記錄一下這個Token,一會連線influxdb需要,相當於賬號密碼

解決playload沒有時間戳問題

對於時序庫來講,時間戳是非常重要的,但是我們拿到的playload並沒有時間戳(MQTTNet包我沒有找到拿時間戳的方法)。
所以我們需要在mqtt上想辦法,讓裝置上報資料的時候,mqtt自動新增時間戳到playload中。
1、我們在資料整合->規則中新建一條規則名稱為"Add_Ts"。SQL編寫如下

SELECT
  *,
  now_timestamp('millisecond') as payload.Ts
FROM
  "topic/#"

topic/# 代表訊息釋出到"topic/#"主題的事件
now_timestamp函數返回當前時間的 Unix 時間戳,我們將時間戳寫入到payload的Ts屬性中,關於更多內建SQL函數,請參考官方檔案

https://www.emqx.io/docs/zh/v5.0/data-integration/rule-sql-builtin-functions.html

2、我們開啟下面的偵錯,模擬裝置上報一條資料,可以看到這條規則幫我們加入了時間戳。

3、然後我們還需要處理新增了時間戳的處理結果,我們在右側新增一個動作,選擇訊息重發布,將剛剛新增了時間戳的訊息重發到一個新的Topic上,我們使用topic/dp,並在playload中新增${payload},這樣我們就修改了playload中的資訊,新增了我們需要的時間戳,當然,我們Hub訂閱的訊息也需要對應修改,新增/dp字尾。

4、首先我們先修改MASA.IoT.Hub的組態檔,Topic新增"/dp"字尾

  "MqttSetting": {
...
    "Topic": "$share/IotHub/topic/+/dp"
  },

5、CallbackAsync中,因為我們裝置名稱是從Topic擷取的,也要對應修改一下。

    private async Task CallbackAsync(MqttApplicationMessageReceivedEventArgs e)
    {
        var deviceDataPointStr = System.Text.Encoding.Default.GetString(e.ApplicationMessage.PayloadSegment);

        Console.WriteLine(deviceDataPointStr);
        var pubSubOptions = new PubSubOptions
        {
            //修改一下獲取裝置名稱的方式
            DeviceName = e.ApplicationMessage.Topic[6..^3],
            Msg = deviceDataPointStr,
            PubTime = new DateTimeOffset(DateTime.Now).ToUnixTimeMilliseconds(),
            TrackId = Guid.NewGuid()
        };                            
...
    }

程式碼編寫

解決完時間戳的問題,我們就可以編寫程式碼向InfluxDB中寫入資料了,我們首先在Infrastructure資料夾下建立ITimeSeriesDbClient介面和TimeSeriesDbClient類,使用介面也方便我們日後更換其他的時序庫。
這裡使用了InfluxDB.Client包。
ITimeSeriesDbClient.cs

namespace MASA.IoT.Core.Infrastructure
{
    public interface ITimeSeriesDbClient
    {
        bool WriteMeasurement<T>(T measurement);
    }
}

TimeSeriesDbClient.cs

using InfluxDB.Client;
using InfluxDB.Client.Api.Domain;
using MASA.IoT.WebApi;
using Microsoft.Extensions.Options;

namespace MASA.IoT.Core.Infrastructure
{
    public class TimeSeriesDbClient : ITimeSeriesDbClient
    {
        private readonly InfluxDBClient _client;
        private readonly string _bucket;
        private readonly string _org;
        private readonly AppSettings _appSettings;
        
        public TimeSeriesDbClient(IOptions<AppSettings> settings)
        {
            _appSettings = settings.Value;
            _org = _appSettings.InfluxDBSetting.Org;
            _bucket = _appSettings.InfluxDBSetting.Bucket;
            _client = new InfluxDBClient(_appSettings.InfluxDBSetting.Url, _appSettings.InfluxDBSetting.Token);
        }

        public bool WriteMeasurement<T>(T measurement)
        {
            try
            {
                using var writeApi = _client.GetWriteApi();
                writeApi.WriteMeasurement<T>(measurement, WritePrecision.Ms, _bucket, _org);
                return true;
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
                return false;
            }
        }
    }
}

這裡使用new InfluxDBClient(_appSettings.InfluxDBSetting.Url, _appSettings.InfluxDBSetting.Token)來構造InfluxDBClient。
Token就是我們建立Bucket過程中儲存的Token
Url是我們InfluxDB的存取地址:http://127.0.0.1:8086
寫入的方法WriteMeasurement中我們通過_client.GetWriteApi建立一個寫入的api然後直接將我們要寫入的泛型實體寫入,第二個可選引數代表寫入精度,這裡我們使用WritePrecision.Ms
我們在DeviceHandler.cs中注入ITimeSeriesDbClient 並新增一個WriteMeasurementAsync方法,在方法中我們先根據裝置名稱獲取產品,如果識別產品ID為10001(空淨產品),
那麼我們就寫入資料到Measurement:AirPurifierDataPoint
Measurement相當於資料庫的表。
MeasurementColumn特性都是InfluxDB.Client.Core提供的,可以用來標識TagTimestamp

using InfluxDB.Client.Core;
using Newtonsoft.Json;

namespace MASA.IoT.Core.Contract
{
    [Measurement("AirPurifierDataPoint")]
    public class AirPurifierDataPoint
    {
        /// <summary>
        /// 裝置名稱
        /// </summary>
        [Column("DeviceName", IsTag = true)] public string DeviceName { get; set; }

        /// <summary>
        /// 產品ID
        /// </summary>
        [Column("ProductId", IsTag = true)] public Guid ProductId { get; set; }

        /// <summary>
        /// Pm2.5
        /// </summary>
        [Column("PM_25")] public double? Pm_25 { get; set; }
        /// <summary>
        /// 溫度
        /// </summary>
        [Column("Temperature")] public double? Temperature { get; set; }
        /// <summary>
        /// 溼度
        /// </summary>
        [Column("Humidity")] public double? Humidity { get; set; }
        /// <summary>
        /// 時間戳
        /// </summary>
        [JsonProperty(propertyName: "Ts")]
        [Column(IsTimestamp = true)] public long Timestamp { get; set; }
    }
}

    public class DeviceHandler : IDeviceHandler
    {
        private readonly MASAIoTContext _ioTDbContext;
        private readonly IMqttHandler _mqttHandler;
        private readonly ITimeSeriesDbClient _timeSeriesDbClient;

        public DeviceHandler(MASAIoTContext ioTDbContext, IMqttHandler mqttHandler, ITimeSeriesDbClient timeSeriesDbClient)
        {
            _ioTDbContext = ioTDbContext;
            _mqttHandler = mqttHandler;
            _timeSeriesDbClient = timeSeriesDbClient;
        }

        /// <summary>
        /// 寫入資料
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="pubSubOptions"></param>
        /// <returns></returns>
        public async Task<bool> WriteMeasurementAsync<T>(PubSubOptions pubSubOptions)
        {
            var device = await _ioTDbContext.IoTDeviceInfo.Include(o => o.ProductInfo).AsNoTracking()
                .FirstOrDefaultAsync(o => o.DeviceName == pubSubOptions.DeviceName);

            if (device != null && device.ProductInfo.ProductCode == "10001")  //空氣淨化器產品
            {
                var airPurifierDataPoint = JsonConvert.DeserializeObject<AirPurifierDataPoint>(pubSubOptions.Msg);

                airPurifierDataPoint.ProductId = device.ProductInfoId;
  
                return _timeSeriesDbClient.WriteMeasurement<AirPurifierDataPoint>(airPurifierDataPoint);

            }
            return false;
        }

除了WriteMeasurement方法之外,還提供了很多其他方法,如WritePoint,和批次寫入的方法,可自行測試。

測試

我們啟動專案,通過MQTTX向"topic/284202304230001"上報一條資料

{
  "DeviceName":"284202304230001",
  "Pm_25":100,
  "Temperature":25,
  "Humidity":50
}

我們在influxDB的管理工具中使用Data Explorer,使用如下的flux query查詢語句,即可查出5分鐘之內的資料,注意,這裡的時間是UTC時間

如果想顯示北京時區方便偵錯,可以在後面新增|> timeShift(duration: 8h)

from(bucket: "IoTDemos") 
|> range(start:-5m)


關於flux查詢語法

https://docs.influxdata.com/flux/v0.x/

總結

本節我們簡單介紹了開源時序資料庫influxDB的安裝。
我們藉助InfluxDB.Client庫完成裝置從上報到時序庫資料儲存的全過程,下一節我們介紹從時序庫查詢資料。

完整程式碼在這裡:https://github.com/sunday866/MASA.IoT-Training-Demos