分散式多協定接入閘道器FluxMQ-2.0功能說明

2023-07-12 06:00:35

FluxMQ—2.0版本更新內容

前言

FLuxMQ是一款基於java開發,支援無限裝置連線的雲原生分散式物聯網接入平臺。FluxMQ基於Netty開發,底層採用Reactor3反應堆模型,具備低延遲,高吞吐量,千萬、億級別裝置連線;方便企業快速構建其物聯網平臺與應用。

FluxMQ官網:https://www.fluxmq.com FluxMQ演示系統:http://demo.fluxmq.com/

改動說明

功能 說明
自研EventBus通訊元件 提供大批次資料路由壓縮等功能
分散式釋出訂閱匹配樹 提供更快速度的Topic路由
對談訊息 支援Web管理對談訊息、資料持久化
保留訊息 支援Web管理保留訊息、資料持久化
延遲訊息 支援Web管理延遲訊息 、資料持久化
設定持久化 Web設定頁面設定持久化
規則引擎 新增LOG資料來源,資料寫入獨立LOG檔案
規則引擎 新增JSON函數,使用者處理巢狀的JSON資料格式
規則引擎 新增協定擴充套件資料格式,使用者統一轉發第三方擴充套件協定
多協定模組 基於FluxMQ MQTT協定拓展多協定模組、提供同一的連線訂閱管理能力

自研EventBus通訊元件

研發背景

去除Ignite通訊元件,採用自研叢集通訊,結合分散式訂閱樹完成效能的大幅度提升

Flux1.0版本採用Ignite的message API進行資料路由,此方式主要有以下幾個缺點:

  1. 叢集間通訊採用廣播方式,大叢集下通訊效能極低
  2. 不支援萬用字元方式,無法解決萬用字元的路由
  3. 無法實現叢集消費能力
  4. 對Ignite依賴非常重,導致出現問題排查代價很高,並且無法替換之

EventBus特性

叢集通訊

基於TCP元件(後面拓展UDP組播等功能),實現叢集間啟動相互連線,伺服器端實現埠佔用掃描啟動:預設48880埠、如果埠佔用則依次遞增,最大埠號為:49000。 節點啟動後,使用者端根據組態檔設定的叢集IP自動進行埠掃描連線(48880->49000),同時使用者端維護與伺服器端的心跳。避免節點宕機。

報文幀

表格範例

固定頭(1 byte) Topic長度(1 byte) Topic(n byte) Body長度(2 byte) Body(n byte)
訊息型別 2bit Qos 2bit 是否壓縮 1bit 是否批次 1bit 保留bit 2bit 9 test/test 11 HELLO,WORLD

批次壓縮

FluxMQ對於叢集訊息路由會自動計算TPS,當單節點TPS超過2000時,會自動啟動批次壓縮功能,以此提高叢集間傳輸效能(對時延要求極高的可以手動關閉批次壓縮功能)。

分散式釋出訂閱匹配樹

FluxMQ叢集節點間維護一個Root級別的訂閱,訂閱會分為2種:

  • 本地訂閱
  • 遠端訂閱 為了最快搜尋匹配樹,訂閱資訊會維護到每個節點中,當推播Topic通過匹配樹匹配後,本地訂閱直接Write訊息,遠端訂閱走EventBus系統傳輸到遠端節點。

資料管理

2.0版本新增延遲訊息跟對談訊息,1.0版本也有保留訊息,但是管理頁面未實現視覺化管理。下面我們介紹下此次改動的一些

對談訊息

提供分散式對談訊息,對談期間Session訊息持久化儲存,叢集宕機後重啟,資料不丟失,重啟叢集後資料重新載入

保留訊息

根據Topic保留訊息,每個TOPIC僅保留一條,當傳輸的MQTT payload為空時,則清空保留訊息。資料持久化儲存,重啟叢集後,資料重新載入

延遲訊息

FluxMQ提供大批次定時下發Topic指令的能力,單機支援百萬級別延遲訊息指令下發,在叢集模式下,FluxMQ接收到延遲指令後,會自動負載到執行節點執行,當執行節點宕機後,此節點未執行的任務會自動由其他節點繼續執行,提供分散式協調任務的能力

延遲Topic格式:

$DELAY/延遲秒指/TOPIC

設定持久化

基於Ignite的實現設定資料區持久化,目前持久化的資料內容有以下:

資料區 是否開啟持久化
資料來源設定 ✔️
規則設定 ✔️
ACL設定 ✔️
系統設定 ✔️
保留訊息 ✔️
對談訊息 ✔️
延遲訊息 ✔️
規則引擎 ✔️
雲使用者端 ✔️
協定擴充套件 ✔️

規則引擎

LOG檔案列印

此項功能可以用於偵錯報文,並且於叢集各節點生產獨立的log檔案,用於快速定位問題

資料庫SQL模板支援Json函數'

資料輸入:

{
    "msg": {
      "id":"id",
      "body":{
        "state":1,
        "no":2
      }
    },
    "messageId": 1,
    "topic""test",
    "qos": 1,
    "retain"false,
    "time""2022 12-22 12:00:00",
    "clientId""A1212313"
}

此時我只想插入msg內容下的body結構體,以下是一個通用的插入SQL語句模板:

insert into table (clientId,topic,msg) values ('${clientId}','${topic}','${json(msg.body)}')

通過json(變數名) 方式給結構體轉成json字串替換成插入欄位的值

多協定模組

目前FluxMQ內建了COAP、WEBSOCKET、I1協定的元件,可以指定埠啟動,啟動後,可以通過MQTT與協定元件之間互動。每個使用者端必須按照FluxMQ的標準進行接入。擴充套件協定與FluxMQ的MQTT共用以下元件:

  • 認證模組
  • 規則引擎
  • 連線管理
  • 紀錄檔管理
  • 監控管理

上行指令

通過規則引擎設定選擇擴充套件協定資料型別

select * from "$EVENT.EXTENSION"

傳輸的資料格式如下:

{
    "protocol""I1",
    "cmd""PUBLISH",
    "messageId": 0,
    "time""2023-07-11 21:59:23",
    "clientId""clientId",
    "nodeIp""127.0.0.1",
    "clientIp""127.0.0.1:19999",
    "body""body"
}
欄位 說明
protocol 協定名稱
cmd 指令型別
- PUBLISH 推播訊息
- CONNECT 連線
- CLOSE 斷開
messageId 訊息id
time 時間
clientIp 使用者端地址
nodeIp 所在叢集節點IP
body 報文,如果傳輸是JSON會自動轉成JSON格式,否則統一UTF8字串處理

新增一個轉發WEBSOCKET協定的報文

SQL如下:

select * from "$EVENT.EXTENSION WHERE protocol='WEBSOCKET'"

新增一個轉發WEBSOCKET 上報協定的報文

SQL如下:

select * from "$EVENT.EXTENSION WHERE protocol='WEBSOCKET' AND cmd ='PUBLISH'

下行指令

通過MQTT使用者端下發FluxMQ叢集指令,即可將指令寫給擴充套件協定使用者端,格式如下:

$PROTOCOL/協定名稱/{clientId}

連線管理

啟動WEBSOCKET協定外掛

WEBSOCKET使用者端連線

ws://123.249.9.130:7777/test

連線管理