FLuxMQ是一款基於java開發,支援無限裝置連線的雲原生分散式物聯網接入平臺。FluxMQ基於Netty開發,底層採用Reactor3反應堆模型,具備低延遲,高吞吐量,千萬、億級別裝置連線;方便企業快速構建其物聯網平臺與應用。
FluxMQ官網:https://www.fluxmq.com FluxMQ演示系統:http://demo.fluxmq.com/
功能 | 說明 |
---|---|
自研EventBus通訊元件 | 提供大批次資料路由壓縮等功能 |
分散式釋出訂閱匹配樹 | 提供更快速度的Topic路由 |
對談訊息 | 支援Web管理對談訊息、資料持久化 |
保留訊息 | 支援Web管理保留訊息、資料持久化 |
延遲訊息 | 支援Web管理延遲訊息 、資料持久化 |
設定持久化 | Web設定頁面設定持久化 |
規則引擎 | 新增LOG資料來源,資料寫入獨立LOG檔案 |
規則引擎 | 新增JSON函數,使用者處理巢狀的JSON資料格式 |
規則引擎 | 新增協定擴充套件資料格式,使用者統一轉發第三方擴充套件協定 |
多協定模組 | 基於FluxMQ MQTT協定拓展多協定模組、提供同一的連線訂閱管理能力 |
❝去除Ignite通訊元件,採用自研叢集通訊,結合分散式訂閱樹完成效能的大幅度提升
❞
Flux1.0版本採用Ignite的message API進行資料路由,此方式主要有以下幾個缺點:
基於TCP元件(後面拓展UDP組播等功能),實現叢集間啟動相互連線,伺服器端實現埠佔用掃描啟動:預設48880埠、如果埠佔用則依次遞增,最大埠號為:49000。 節點啟動後,使用者端根據組態檔設定的叢集IP自動進行埠掃描連線(48880->49000),同時使用者端維護與伺服器端的心跳。避免節點宕機。
固定頭(1 byte) | Topic長度(1 byte) | Topic(n byte) | Body長度(2 byte) | Body(n byte) | ||||
---|---|---|---|---|---|---|---|---|
訊息型別 2bit | Qos 2bit | 是否壓縮 1bit | 是否批次 1bit | 保留bit 2bit | 9 | test/test | 11 | HELLO,WORLD |
❝FluxMQ對於叢集訊息路由會自動計算TPS,當單節點TPS超過2000時,會自動啟動批次壓縮功能,以此提高叢集間傳輸效能(對時延要求極高的可以手動關閉批次壓縮功能)。
❞
FluxMQ叢集節點間維護一個Root級別的訂閱,訂閱會分為2種:
❝2.0版本新增延遲訊息跟對談訊息,1.0版本也有保留訊息,但是管理頁面未實現視覺化管理。下面我們介紹下此次改動的一些
❞
提供分散式對談訊息,對談期間Session訊息持久化儲存,叢集宕機後重啟,資料不丟失,重啟叢集後資料重新載入
根據Topic保留訊息,每個TOPIC僅保留一條,當傳輸的MQTT payload為空時,則清空保留訊息。資料持久化儲存,重啟叢集後,資料重新載入
❝FluxMQ提供大批次定時下發Topic指令的能力,單機支援百萬級別延遲訊息指令下發,在叢集模式下,FluxMQ接收到延遲指令後,會自動負載到執行節點執行,當執行節點宕機後,此節點未執行的任務會自動由其他節點繼續執行,提供分散式協調任務的能力
❞
延遲Topic格式:
$DELAY/延遲秒指/TOPIC
基於Ignite的實現設定資料區持久化,目前持久化的資料內容有以下:
資料區 | 是否開啟持久化 |
---|---|
資料來源設定 | ✔️ |
規則設定 | ✔️ |
ACL設定 | ✔️ |
系統設定 | ✔️ |
保留訊息 | ✔️ |
對談訊息 | ✔️ |
延遲訊息 | ✔️ |
規則引擎 | ✔️ |
雲使用者端 | ✔️ |
協定擴充套件 | ✔️ |
❝此項功能可以用於偵錯報文,並且於叢集各節點生產獨立的log檔案,用於快速定位問題
❞
資料輸入:
{
"msg": {
"id":"id",
"body":{
"state":1,
"no":2
}
},
"messageId": 1,
"topic": "test",
"qos": 1,
"retain": false,
"time": "2022 12-22 12:00:00",
"clientId": "A1212313"
}
此時我只想插入msg內容下的body結構體,以下是一個通用的插入SQL語句模板:
insert into table (clientId,topic,msg) values ('${clientId}','${topic}','${json(msg.body)}')
通過json(變數名) 方式給結構體轉成json字串替換成插入欄位的值
目前FluxMQ內建了COAP、WEBSOCKET、I1協定的元件,可以指定埠啟動,啟動後,可以通過MQTT與協定元件之間互動。每個使用者端必須按照FluxMQ的標準進行接入。擴充套件協定與FluxMQ的MQTT共用以下元件:
❝通過規則引擎設定選擇擴充套件協定資料型別
❞
select * from "$EVENT.EXTENSION"
傳輸的資料格式如下:
{
"protocol": "I1",
"cmd": "PUBLISH",
"messageId": 0,
"time": "2023-07-11 21:59:23",
"clientId": "clientId",
"nodeIp": "127.0.0.1",
"clientIp": "127.0.0.1:19999",
"body": "body"
}
欄位 | 說明 |
---|---|
protocol | 協定名稱 |
cmd | 指令型別 - PUBLISH 推播訊息 - CONNECT 連線 - CLOSE 斷開 |
messageId | 訊息id |
time | 時間 |
clientIp | 使用者端地址 |
nodeIp | 所在叢集節點IP |
body | 報文,如果傳輸是JSON會自動轉成JSON格式,否則統一UTF8字串處理 |
SQL如下:
select * from "$EVENT.EXTENSION WHERE protocol='WEBSOCKET'"
SQL如下:
select * from "$EVENT.EXTENSION WHERE protocol='WEBSOCKET' AND cmd ='PUBLISH'
通過MQTT使用者端下發FluxMQ叢集指令,即可將指令寫給擴充套件協定使用者端,格式如下:
$PROTOCOL/協定名稱/{clientId}
ws://123.249.9.130:7777/test