@
我們可以將裝置上行資料儲存到關係型資料庫中,我們需要兩張帶有時間戳的表(最新資料表 和 歷史資料表),歷史資料表儲存所有裝置上報的資料,最新資料表需要儲存裝置最新一條上報資料,這條最新資料相當於裝置的當前狀態。然後展示的時候只展示最新一條資料的狀態,報表查詢可以按照裝置id和時間從歷史資料表查詢彙總。
這樣是可以的,但是我們的最新資料表需要被頻繁的更新,資料量少的時候沒問題。但資料量大,並行高的時候就會出現問題。
1、儲存成本:資料不會被壓縮,導致佔用儲存資源。
2、維護成本:單表資料量太大時,需要人工分庫分表。
3、寫入效能:單機寫入吞吐量難以滿足大量上行資料的寫入需求,資料庫存在效能瓶頸。
4、查詢效能:資料量太大導致查詢效能受到影響。
我們可以採用時序庫來解決上述問題,首先來了解一下什麼是時序資料。時序資料是按照時間維度進行索引的資料,它記錄了某個被測量實體在一定時間範圍內,每個時間點上的一組測試值。感測器上傳的室內PM2.5和甲醛資料、淨水器感測器當前的TDS值、計算機系統的監控資料等,都屬於時序資料,時序資料有如下特點:
1、資料量較大,寫入操作是持續且平穩的,而且寫多讀少。
2、只有寫入操作,幾乎沒有更新操作,比如去修改感測器的歷史資料,是沒有意義的。
3、沒有隨機刪除,即使刪除也是按照時間範圍進行刪除。刪除某一個時間點的資料沒有意義,但是刪除2年前的資料是有意義的。
4、資料實時性和時效性強,資料隨著時間的推移不斷追加,舊資料很快失去意義。
5、大部分以時間和實體為維度進行查詢,很少以測試值為維度查詢,比如使用者會查詢某個時間段的溫度資料,但是很少會去查詢溫度高於多少度的資料記錄。
顯然IoT的業務是符合使用時序庫的場景的。
序資料庫就是用來儲存時序資料的資料庫,時序資料庫相較於傳統的關係型資料和非關係型資料庫而言,專門優化了對時序資料的儲存,開源的時序資料庫有InfluxDB OpenTSDB、TimeScaleDB 等。本文以InfluxDB資料庫進行演示。
時序資料庫有如下幾個概念。
1.Metric:度量,相當於關係型資料庫中的表(table)。
2.Data Point:資料點,相當於關係型資料庫的中的行(row)。
3.Timestamp:時間戳,資料點生成時的時間戳。
4.Field:測量值,比如溫度和溼度、PM2.5等。
5.Tag:標籤,用於標識資料點,通常用來標識資料點的來源,比如溫度和溼度資料來自哪個房間,哪個裝置,可以當作關係型資料庫表的主鍵。
如下圖,度量為 Wind,每一個資料點都具有一個 timestamp,兩個 field:direction 和 speed,兩個 tag:sensor、city。它的第一行和第三行,存放的都是 sensor 號碼為 95D8-7913 的裝置,屬性城市是上海。隨著時間的變化,風向和風速都發生了改變,風向從 23.4 變成 23.2;而風速從 3.4 變成了 3.3。
圖片來自網路
安裝參考官方檔案,為了方便,我這裡採用docker安裝
docker run --name influxdb -p 8086:8086 influxdb:2.7.0
我們開啟 伺服器ip:8086 可以看到它自帶的管理介面,我們首先建立使用者名稱密碼,組織、以及Bucket的名稱。
這裡的bucket "IoTDemos" 相當於資料庫的名稱
我們記錄一下這個Token,一會連線influxdb需要,相當於賬號密碼
對於時序庫來講,時間戳是非常重要的,但是我們拿到的playload並沒有時間戳(MQTTNet包我沒有找到拿時間戳的方法)。
所以我們需要在mqtt上想辦法,讓裝置上報資料的時候,mqtt自動新增時間戳到playload中。
1、我們在資料整合->規則中新建一條規則名稱為"Add_Ts"。SQL編寫如下
SELECT
*,
now_timestamp('millisecond') as payload.Ts
FROM
"topic/#"
topic/# 代表訊息釋出到"topic/#"主題的事件
now_timestamp函數返回當前時間的 Unix 時間戳,我們將時間戳寫入到payload的Ts屬性中,關於更多內建SQL函數,請參考官方檔案
https://www.emqx.io/docs/zh/v5.0/data-integration/rule-sql-builtin-functions.html
2、我們開啟下面的偵錯,模擬裝置上報一條資料,可以看到這條規則幫我們加入了時間戳。
3、然後我們還需要處理新增了時間戳的處理結果,我們在右側新增一個動作,選擇訊息重發布,將剛剛新增了時間戳的訊息重發到一個新的Topic上,我們使用topic/dp,並在playload中新增${payload},這樣我們就修改了playload中的資訊,新增了我們需要的時間戳,當然,我們Hub訂閱的訊息也需要對應修改,新增/dp字尾。
4、首先我們先修改MASA.IoT.Hub的組態檔,Topic新增"/dp"字尾
"MqttSetting": {
...
"Topic": "$share/IotHub/topic/+/dp"
},
5、CallbackAsync中,因為我們裝置名稱是從Topic擷取的,也要對應修改一下。
private async Task CallbackAsync(MqttApplicationMessageReceivedEventArgs e)
{
var deviceDataPointStr = System.Text.Encoding.Default.GetString(e.ApplicationMessage.PayloadSegment);
Console.WriteLine(deviceDataPointStr);
var pubSubOptions = new PubSubOptions
{
//修改一下獲取裝置名稱的方式
DeviceName = e.ApplicationMessage.Topic[6..^3],
Msg = deviceDataPointStr,
PubTime = new DateTimeOffset(DateTime.Now).ToUnixTimeMilliseconds(),
TrackId = Guid.NewGuid()
};
...
}
解決完時間戳的問題,我們就可以編寫程式碼向InfluxDB中寫入資料了,我們首先在Infrastructure資料夾下建立ITimeSeriesDbClient介面和TimeSeriesDbClient類,使用介面也方便我們日後更換其他的時序庫。
這裡使用了InfluxDB.Client包。
ITimeSeriesDbClient.cs
namespace MASA.IoT.Core.Infrastructure
{
public interface ITimeSeriesDbClient
{
bool WriteMeasurement<T>(T measurement);
}
}
TimeSeriesDbClient.cs
using InfluxDB.Client;
using InfluxDB.Client.Api.Domain;
using MASA.IoT.WebApi;
using Microsoft.Extensions.Options;
namespace MASA.IoT.Core.Infrastructure
{
public class TimeSeriesDbClient : ITimeSeriesDbClient
{
private readonly InfluxDBClient _client;
private readonly string _bucket;
private readonly string _org;
private readonly AppSettings _appSettings;
public TimeSeriesDbClient(IOptions<AppSettings> settings)
{
_appSettings = settings.Value;
_org = _appSettings.InfluxDBSetting.Org;
_bucket = _appSettings.InfluxDBSetting.Bucket;
_client = new InfluxDBClient(_appSettings.InfluxDBSetting.Url, _appSettings.InfluxDBSetting.Token);
}
public bool WriteMeasurement<T>(T measurement)
{
try
{
using var writeApi = _client.GetWriteApi();
writeApi.WriteMeasurement<T>(measurement, WritePrecision.Ms, _bucket, _org);
return true;
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
return false;
}
}
}
}
這裡使用new InfluxDBClient(_appSettings.InfluxDBSetting.Url, _appSettings.InfluxDBSetting.Token)來構造InfluxDBClient。
Token就是我們建立Bucket過程中儲存的Token。
Url是我們InfluxDB的存取地址:http://127.0.0.1:8086
寫入的方法WriteMeasurement中我們通過_client.GetWriteApi建立一個寫入的api然後直接將我們要寫入的泛型實體寫入,第二個可選引數代表寫入精度,這裡我們使用WritePrecision.Ms
我們在DeviceHandler.cs中注入ITimeSeriesDbClient 並新增一個WriteMeasurementAsync方法,在方法中我們先根據裝置名稱獲取產品,如果識別產品ID為10001(空淨產品),
那麼我們就寫入資料到Measurement:AirPurifierDataPoint
Measurement相當於資料庫的表。
Measurement和Column特性都是InfluxDB.Client.Core提供的,可以用來標識Tag、Timestamp等
using InfluxDB.Client.Core;
using Newtonsoft.Json;
namespace MASA.IoT.Core.Contract
{
[Measurement("AirPurifierDataPoint")]
public class AirPurifierDataPoint
{
/// <summary>
/// 裝置名稱
/// </summary>
[Column("DeviceName", IsTag = true)] public string DeviceName { get; set; }
/// <summary>
/// 產品ID
/// </summary>
[Column("ProductId", IsTag = true)] public Guid ProductId { get; set; }
/// <summary>
/// Pm2.5
/// </summary>
[Column("PM_25")] public double? Pm_25 { get; set; }
/// <summary>
/// 溫度
/// </summary>
[Column("Temperature")] public double? Temperature { get; set; }
/// <summary>
/// 溼度
/// </summary>
[Column("Humidity")] public double? Humidity { get; set; }
/// <summary>
/// 時間戳
/// </summary>
[JsonProperty(propertyName: "Ts")]
[Column(IsTimestamp = true)] public long Timestamp { get; set; }
}
}
public class DeviceHandler : IDeviceHandler
{
private readonly MASAIoTContext _ioTDbContext;
private readonly IMqttHandler _mqttHandler;
private readonly ITimeSeriesDbClient _timeSeriesDbClient;
public DeviceHandler(MASAIoTContext ioTDbContext, IMqttHandler mqttHandler, ITimeSeriesDbClient timeSeriesDbClient)
{
_ioTDbContext = ioTDbContext;
_mqttHandler = mqttHandler;
_timeSeriesDbClient = timeSeriesDbClient;
}
/// <summary>
/// 寫入資料
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="pubSubOptions"></param>
/// <returns></returns>
public async Task<bool> WriteMeasurementAsync<T>(PubSubOptions pubSubOptions)
{
var device = await _ioTDbContext.IoTDeviceInfo.Include(o => o.ProductInfo).AsNoTracking()
.FirstOrDefaultAsync(o => o.DeviceName == pubSubOptions.DeviceName);
if (device != null && device.ProductInfo.ProductCode == "10001") //空氣淨化器產品
{
var airPurifierDataPoint = JsonConvert.DeserializeObject<AirPurifierDataPoint>(pubSubOptions.Msg);
airPurifierDataPoint.ProductId = device.ProductInfoId;
return _timeSeriesDbClient.WriteMeasurement<AirPurifierDataPoint>(airPurifierDataPoint);
}
return false;
}
除了WriteMeasurement方法之外,還提供了很多其他方法,如WritePoint,和批次寫入的方法,可自行測試。
我們啟動專案,通過MQTTX向"topic/284202304230001"上報一條資料
{
"DeviceName":"284202304230001",
"Pm_25":100,
"Temperature":25,
"Humidity":50
}
我們在influxDB的管理工具中使用Data Explorer,使用如下的flux query查詢語句,即可查出5分鐘之內的資料,注意,這裡的時間是UTC時間
如果想顯示北京時區方便偵錯,可以在後面新增|> timeShift(duration: 8h)
from(bucket: "IoTDemos")
|> range(start:-5m)
關於flux查詢語法
本節我們簡單介紹了開源時序資料庫influxDB的安裝。
我們藉助InfluxDB.Client庫完成裝置從上報到時序庫資料儲存的全過程,下一節我們介紹從時序庫查詢資料。
完整程式碼在這裡:https://github.com/sunday866/MASA.IoT-Training-Demos