Python有許多優秀的MQTT使用者端,比較有代表性的有paho-mqtt、hbmqtt、gmqtt等,各有特色
paho-mqtt 有著最優秀的檔案,程式碼風格易於理解,同時有著強大的基金會支援,目前新版本支援 MQTT 5.0
hbmqtt 使用 asyncio 庫實現,可以優化網路 I/O 帶來的延遲,但是程式碼風格不友好,檔案較少,不支援 MQTT 5.0,且不再維護,被原作者棄用,有一個分支amqtt正在由不同的人積極開發
gmqtt 同樣通過 asyncio 庫實現,相比 HBMQTT ,程式碼風格友好,最重要的是支援 MQTT 5.0
paho-mqtt 可以說是 Python MQTT 開源使用者端庫中的佼佼者, 支援5.0、3.1.1 和 3.1 的MQTT 協定,由 Eclipse 基金會主導開發.在基金會的支援下,以每年一個版本的速度更新,穩定、持續,本文使用新版本paho-mqtt v1.6.1
參考文章
使用pip安裝
pip3 install paho-mqtt
# virtualenv安裝和完整程式碼參考官方檔案
paho-mqtt已知的一些限制
截止1.6.1版本,當 clean_session 為 False 時,session 只儲存在記憶體中,不持久化。這意味著當用戶端重新啟動時(不僅僅是重新連線,通常是因為程式重新啟動而重新建立物件)對談丟失。這可能導致訊息丟失。
使用者端對談的以下部分丟失:
已從伺服器收到但尚未完全確認的 QoS 2 訊息。
由於使用者端將盲目地確認任何 PUBCOMP(QoS 2 事務的最後一條訊息),它不會掛起但會丟失此 QoS 2 訊息。
已傳送到伺服器但尚未完全確認的 QoS 1 和 QoS 2 訊息。
這意味著傳遞給 publish() 的訊息可能會丟失。這可以通過注意傳遞給 publish() 的所有訊息都有相應的 on_publish() 呼叫來緩解。
這也意味著代理可能在對談中有 Qos2 訊息。由於使用者端從一個空對談開始,它不知道它並將重新使用中間。這還沒有確定。
此外,當 clean_session 為 True 時,該庫將在網路重新連線時重新發布 QoS > 0 訊息。這意味著 QoS > 0 訊息不會丟失。但是標準規定我們是否應該丟棄傳送了釋出封包的任何訊息。我們的選擇意味著我們不符合標準,並且 QoS 2 有可能被接收兩次。如果只需要一次交付的 QoS 2 保證,應該 clean_session = False
使用paho-mqtt實現使用者端相關功能簡單步驟如下:
使用使用者端連線代理、訂閱等,首先我們需要先建立一個使用者端,paho-mqtt使用Client()建立使用者端範例
# Client 原始碼 引數如下
def __init__(self, client_id="", clean_session=None, userdata=None,
protocol=MQTTv311, transport="tcp", reconnect_on_failure=True):
# 引數示意
client_id = '使用者端ID,str型別,可自定義'
'''
如果長度為零或者為空,則會隨機生成一個,這種情況下,clean_session引數必須為True清除對談,因為持久對談需要client_id為唯一標識來恢復對談
'''
clean_session = '對談清除狀態,bool型別,設為True,broker在斷開連線的時候刪除該client的資訊,為False,則相當於是持久對談'
'''
clean_session在官方檔案範例預設值為True,檢視1.6.1版本原始碼預設值為None,看邏輯應該是支援mqttv5之後做的的變化
原始碼中判斷如果protocol是MQTT V5版本 clean_session不是None的話則丟擲ValueError,就是mqtt5的clean_session必須持久對談,
如果不是mqtt5的話如果clean_session is None 則將clean_session設定為True,這塊就與官方檔案的預設邏輯對應上了
'''
userdata = 'client使用者資料,傳遞給回撥函數,可以是任意型別,可以使用Clinet的 user_data_set()函數進行更新資料'
protocol = '使用者端協定的版本,預設是MQTTv311就是3.1.1版本,也可以是MQTTv31、MQTTv5版本'
'''
protocol的引數在原始碼中是以下對應關係,理論上直接傳入對應int值或者匯入MQTTv** 欄位傳入都可
MQTTv31 = 3
MQTTv311 = 4
MQTTv5 = 5
'''
transport = '底層傳輸協定,預設是使用原始tcp,值可以設定為websockets通過ws傳送mqtt'
reconnect_on_failure = 'bool型別, 連線失敗後是否自動重新connect,預設為True'
'''
官方檔案沒有reconnect_on_failure相關範例和解釋,不確定老版本有沒有該欄位
'''
# -*- coding: utf-8 -*-
# @Time: 2023/5/10 16:09
# @Author: LiQi
# @Describe:
import paho.mqtt.client as mqtt # 匯入clinet 別名 mqtt
# 建立一個使用者端範例賦值client,client_id自定義,其他引數根據需要設定
client = mqtt.Client(client_id='muziqi')
'''paho-mqtt提供reinitialise方法重新初始化已經構造好的使用者端'''
# 上面建立的使用者端clinet,直接呼叫reinitialise
client.reinitialise()
'''
reinitialise方法擁有下面三個引數
client_id="", clean_session=True, userdata=None
'''
選項函數的作用是給構造好的使用者端新增附加選項,一般情況下在使用content連線broker前完成,比如設定Client的證書、回撥、賬號密碼等,根據具體業務使用,設定完成後使用conten連線到broker
當QoS> 0的時候,可以存在的部分完成(已經傳送,還沒有確認成功的訊息)的訊息的最大數量,這些訊息可以同時通過網路流,預設為20.增加此值將消耗更多記憶體,但可以增加吞吐量
# inflight引數為要修改的數量,最小為1,小於1則丟擲ValueError
client.max_inflight_messages_set(inflight)
設定傳出訊息佇列中等待處理的QoS> 0的傳出訊息的最大數量,預設為0表示無限制,當佇列已滿時,其他傳出的訊息都將被丟棄,實際使用中0實際上並不是無限,目前實現最大可為65555,包括佇列中的65535條訊息+inflight的預設20條訊息,如果inflight預設值被訊息,佇列中的實際訊息數量被減少
# queue_size是要修改的最大數量
client.max_queued_messages_set(queue_size)
QoS>0 的訊息,如果傳送之後超過一定時間broker沒有響應,重發訊息的時間,單位是s,預設5s
'''
原始碼註釋該方法不再使用,在2.0版本刪除
'''
設定WebSocket的連線選項,構造Client時transport="websockets"才會使用,connect相關方法之前呼叫
import paho.mqtt.client as mqtt
# 傳輸協定設定為ws
client = mqtt.Client(client_id='muziqi',transport='websockets')
'''
path:broker的mqtt 路徑,以/開頭的字串
headers:頭部資訊可以是字典或者是可呼叫物件。如果是字典的話字典中額外的項被新增到websocket頭中。如果是一個可呼叫的物件,預設的websocket的頭部資訊被傳遞到這個函數,將函數結果當做新的頭不新鮮'''
client.ws_set_options(path="/mqtt", headers=None)
設定網路加密和身份驗證選項,啟用SSL/TLS支援,connect相關方法之前呼叫
# 以下所有引數預設值均為None,根據業務自身選用
client.tls_set(ca_certs, certfile, keyfile, cert_reqs, tls_version, ciphers, keyfile_password)
'''
ca_certs: CA 根證書的路徑,如果這是給定的唯一選項,則使用者端將以與 Web 瀏覽器類似的方式執行,要求代理擁有由ca_certs中的證書頒發機構簽名的證書,並將使用 TLS v1.2 進行通訊,但不會嘗試任何形式的身份驗證。這提供了基本的網路加密,具體取決於代理的設定方式。預設情況下,在 Python 2.7.9+ 或 3.4+ 上,使用系統的預設證書頒發機構。在較舊的 Python 版本上,此引數是必需的
certfile和keyfile: 使用者端私鑰和證書的路徑,分別指向 PEM 編碼的使用者端證書和私鑰的字串,如果這些引數不是None那麼它們將被用作基於 TLS 的身份驗證的使用者端資訊,對此功能的支援取決於代理
cert_reqs: 設定使用者端對 broker 證書的需求,預設是 ssl.CERT_REQUIRED ,表示 broker 必須提供一個證書
tls_version:使用的SSL/TLS協定版本,預設是TLS v1.2 一般不做修改
ciphers:字串,設定允許使用哪些加密方法,預設是None
keyfile_password:如果certfile和keyfile加密了需要密碼解密,可以使用該引數傳遞,如果不提供的話需要在終端輸入,目前無法定義回撥以提供密碼
'''
設定網路加密和身份驗證上下文,啟用SSL/TLS支援,connect相關方法之前呼叫
client.tls_set_context(context)
'''
引數釋義
ssl.SSLContext物件,在python3.4之後,預設情況下由ssl.create_default_context()提供,一般無需自定義設定,業務需要不確定是否使用該方法的話,用tls_set or 預設上下文即可
'''
設定對伺服器證書中的伺服器主機名進行驗證,在connect相關方法之前和 tls_set 或 tls_set_context之後呼叫
client.tls_insecure_set(value)
'''
value是bool型別,設定為True代幣不使用加密,Flase使用加密
值設定為True,可能讓惡意第三方通過 DNS 欺騙等方式冒充你的伺服器
測試程式可以使用該方法來不進行驗證
'''
# 原始碼根據tls_set選項的cert_reqs欄位來設定預設值
if cert_reqs != ssl.CERT_NONE:
self.tls_insecure_set(False)
else:
self.tls_insecure_set(True)
使用 python 紀錄檔包 logging啟用紀錄檔記錄,可以跟後面的on_log紀錄檔回撥方法同時使用
client.enable_logger(logger)
'''
logger引數用來指定記錄器
如果指定了記錄器,那麼將使用該logging.Logger物件,否則將自動建立一個
'''
Paho紀錄檔記錄級別和標準紀錄檔級別對應關係如下
禁用python 紀錄檔包 記錄紀錄檔,對on_log回撥沒有影響
client.disable_logger()
'''
disable_logger相當於是把enable_logger記錄器清除
兩個方法的原始碼也很簡單
↓↓↓
'''
def enable_logger(self, logger=None):
if logger is None:
if self._logger is not None:
return
logger = logging.getLogger(__name__)
self._logger = logger
def disable_logger(self):
self._logger = None
設定使用者名稱和可選的代理身份驗證密碼,密碼根據broker驗證進行設定,可為空,在connect相關方法前呼叫
client.username_pw_set(username,password=None)
將在生成事件時傳遞給回撥方法的私有使用者資料,可以使用這個方法更新構造Client時候的userdata欄位值
client.user_data_set(userdata)
設定遺囑訊息,client沒有使用disconnect方法以外斷開連線,broker推播遺囑訊息
client.will_set(topic,payload,qos,retain,properties)
'''
topic:遺囑釋出主題
payload:預設值為None,遺囑訊息內容
qos:預設值為0,訊息質量級別
retain:bool,預設False,是否將遺囑訊息設定為保留訊息
properties:預設值為None,支援MQTTv5後新增的欄位,用來設定遺囑屬性
'''
遺囑訊息邏輯、報文、上述欄位、屬性可參考https://www.cnblogs.com/Mickey-7/p/17385599.html>
使用者端斷開重新連線延遲的設定
client.reconnect_delay_set(min_delay,max_delay)
'''
min_delay:預設1
max_delay:預設值120
連線丟失的時候,預設等待min_delay,每次嘗試重連後下次min_delay都將翻倍,上限時間是max_delay
連線成功將重置min_delay,Client收到broker的connack報文視為完全連線成功
'''
connect方法的作用是將構造好、設定好選項的的client連線到broker,這是一個阻塞函數
client.connect(host, port, keepalive, bind_address, bind_port,clean_start,properties)
'''
host:遠端代理的主機名或 IP 地址
prot:要連線的伺服器主機的埠,預設為1883,如果是基於SSL/TLS的MQTT的預設埠為8883,如果選項函數使用tls_set或者tls_set_context,可能需要手動提供埠
keepalive:心跳間隔時間,預設為60,單位是秒
bind_address: 預設值是空字串,如果client的本地有多個地址,可以使用該引數繫結一個地址,表示來源
bind_port:預設值是0, 與bind_address一樣,本地有多個地址,可以使用該引數繫結一個埠,表示來源
clean_start:對談設定,預設值 MQTT_CLEAN_START_FIRST_ONLY = 3, mqtt v5版本僅在第一次成功連線時使用乾淨啟動標誌,mqttv5如 果不是3則ValueError
properties:設定對談屬性
'''
clean_start和properties參考:https://www.cnblogs.com/Mickey-7/p/17361919.html
非同步連線方法,與loop_start結合使用以及非阻塞的方式連線,如果一直沒有呼叫loop_start,不會完成連線
client.connect_async(host, port, keepalive, bind_address, bind_port,clean_start,properties)
'''
引數參考connect方法
'''
DNS連線,使用 SRV DNS查詢連線到代理以獲取代理地址
client.connect_srv(domain, keepalive, bind_address,clean_start, properties)
'''
domain:用於搜尋SRV記錄的DNS域;如果沒有則嘗試本地域名
其餘引數參考connect方法
'''
在使用connect相關方法連線過之後,想要重新連線可以使用reconnect方法,完全複用之前connect相關的引數進行連線
client.reconnect()
主動徹底斷開和broker的連線,呼叫該方法不會等待排隊的訊息被傳送
client.disconnect()
網路迴圈的函數有四種,執行在後臺,處理收發的訊息,如果不使用的話,傳入的資料不會被處理,傳出的資料不會傳送
定期呼叫用以處理訊息,通過 select() 函數阻塞,直到有訊息需要收發,並根據訊息型別觸發對應的回撥函數
run_status = True:
while run_status:
clinet.loop(timeout,max_packets)
'''
timeout:預設值1.0,阻塞超時時間,單位S,不能超過心跳時間keepalive,否則client會定時從broker 斷開
max_packets:預設值是1,不用設定,已經過時不使用
'''
實現loop的呼叫和停止,在connect相關方法之前或之後呼叫loop_start,會在後臺執行一個執行緒欄位呼叫loop,無需編碼實現loop迴圈,
每個使用者端必須有一個loop迴圈
# 在connect相關方法之前或之後呼叫,會在後臺執行一個執行緒呼叫loop,連線中斷的時候會自動嘗試重新連線,無需編碼實現loop迴圈
client.loop_start()
# 停止loop迴圈的後臺執行緒
client.loop_stop()
網路迴圈的阻塞形式,loop_forever 呼叫會阻塞主執行緒,永遠不會停止,直到使用者端呼叫disconnect,會自動重新連線
client.loop_forever(timeout,max_packets,retry_first_connection)
'''
retry_first_connection:bool,預設False,在第一次連線嘗試失敗後是否應該進行重試,與reconnect_on_failure不同,它隻影響第一次連線,如果設定為True,當第一次連線失敗時,paho-mqtt會丟擲OSError/WebsocketConnectionError異常
'''
loop方法:預設呼叫一下只回圈一次,然後返回。這意味著它只會執行一次通訊迴圈,不使用start方法的話需要無限while 並且在完成後將返回到呼叫程式。
loop_forever方法:它是一個無限迴圈,如果沒有意外情況,將一直保持執行。 它在連線丟失或出現其他錯誤時會自動重新連線。因此,它可以用於長期執行的應用程式,以保持MQTT使用者端連線
loop_start函數是一個非阻塞的函數,可以啟動一個執行緒來處理MQTT使用者端的網路通訊和事件處理。該函數返回後,MQTT使用者端將會在後臺執行緒中執行,不會阻塞當前執行緒。在呼叫loop_start函數後,可以分別呼叫connect、publish、subscribe等方法來進行MQTT通訊。但是在使用完MQTT使用者端後,需要呼叫loop_stop函數來停止後臺執行緒
loop_forever函數是一個阻塞的函數,可以啟動一個執行緒來處理MQTT使用者端的網路通訊和事件處理。該函數會一直阻塞當前執行緒,直到MQTT使用者端接收到disconnect訊息或者呼叫了disconnect函數。在呼叫loop_forever函數後,不能再使用connect、publish、subscribe等方法來進行MQTT通訊,因為當前執行緒已經被阻塞。只有在呼叫了disconnect函數後,才會解除阻塞
因此,loop_start和loop_forever的區別在於是否阻塞當前執行緒。如果需要在MQTT使用者端後臺執行並且繼續使用當前執行緒進行其他操作,可以使用loop_start函數;如果需要一直阻塞當前執行緒直到MQTT使用者端退出,可以使用loop_forever函數
使用者端傳送訊息到broker,broker將訊息轉發到訂閱匹配主題的使用者端
client.publish(topic, payload, qos, retain, properties)
'''
topic:訊息釋出的主題
payload:預設None,要傳送的實際訊息,如果沒有或者為None,將傳送長度為0的訊息,payload傳遞int或者float會被預設轉換為str
傳送真正型別的int、float需要使用struct.pack()建立
qps:訊息之類預設0
retain:是否設定保留訊息
properties:設定保留訊息的屬性
'''
訊息傳送之後會返回一個MQTTMessageInfo物件,有以下屬性和方法
如果主題為None,長度為零或無效(包含萬用字元),如果qos不是0、1 或 2 之一,或者有效負載的長度大於 268435455 位元組,則會引發ValueError
使用者端訂閱主題方法,可以通過3種方式訂閱,mqttv5還有3種方式
client.subscribe(topic, qos, options, properties)
topic = '訂閱的主題'
qos = '服務質量預設為0'
options:'訂閱選項,mqtt v5 使用'
'''
訂閱的選項必須是SubscribeOptions物件,位於 from paho.mqtt.subscribeoptions import SubscribeOptions
SubscribeOptions的構造引數如下
qos:與MQTT v3.1.1中相同。
noLocal:True或False。如果設定為True,則訂閱者不會收到自己的釋出。
retainAsPublished:True或False。如果設定為True,則收到的釋出的retain標誌將按釋出者設定。
retainHandling:RETAIN_SEND_ON_SUBSCRIBE,RETAIN_SEND_IF_NEW_SUB或RETAIN_DO_NOT_SEND
控制代理何時傳送保留訊息:
RETAIN_SEND_ON_SUBSCRIBE:在任何成功的訂閱請求上
RETAIN_SEND_IF_NEW_SUB:僅在訂閱請求是新的情況下
RETAIN_DO_NOT_SEND:從不傳送保留訊息
'''
properties = '設定MQTT v5.0屬性的properties範例,可選-如果不設定,則不傳送任何屬性'
字串和整數訂閱
client.subscribe('my/topic',2)
'''
topic:指定要訂閱的訂閱主題的字串。
qos:訂閱所需的服務質量級別,預設為0。
選項和屬性:未使用
'''
字串和整數的元組
client.subscribe(('my/topic',1))
'''
topic: (topic, qos)的元組
qos、選項和屬性:未使用
'''
字串和整數元組的列表
client.subscribe([('my/topcic',0),('my/topic2',1)])
'''
這允許在單個 SUBSCRIPTION 命令中進行多個主題訂閱,這比使用多次呼叫subscribe()更有效
(topic, qos)的元組列表。topic 和 qos 都必須存在於所有元組中
qos、選項和屬性:未使用
'''
字串和訂閱選項(僅MQTT v5.0)
from paho.mqtt.subscribeoptions import SubscribeOptions
client.subscribe('my/topic',options=SubscribeOptions(qos=2))
'''
topic:訂閱主題。
qos:未使用。
選項:MQTT v5.0訂閱選項。
properties:屬性未設定
'''
字串和訂閱選項元組(僅MQTT v5.0)
from paho.mqtt.subscribeoptions import SubscribeOptions
client.subscribe(('my/topic', SubscribeOptions(qos=1)))
'''
topic: (topic, SubscribeOptions)的元組。主題和訂閱選項必須出現在元組中
qos、options、roperties未使用
'''
字串和訂閱選項元組列表(僅MQTT v5.0)
from paho.mqtt.subscribeoptions import SubscribeOptions
client.subscribe([('my/topic', SubscribeOptions(qos=1)),('my/topic2', SubscribeOptions(qos=2))])
'''
允許在單個SUBSCRIPTION中進行多個主題訂閱命令,比使用多個呼叫更有效
topic:格式元組(topic, SubscribeOptions)的列表。主題和訂閱選項必須出現在所有元組中。
qos和options:未使用。
properties:未設定
'''
返回一個元組(result, mid),其中result為MQTT_ERR_SUCCESS表示成功或(MQTT_ERR_NO_CONN, None)使用者端當前未連線。
mid是訂閱請求的訊息 ID。mid 值可用於通過檢查on_subscribe()回撥中的 mid 引數(如果已定義)來跟蹤訂閱請求。
如果qos不是 0、1 或 2,或者主題為None或字串長度為零,或者主題不是字串、元組或列表,則引發ValueError
取消訂閱一個或多個主題
client.unsubscribe(topic,properties)
'''
topic:一個主題字串或主題字串列表,取消訂閱的訂閱主題
properties:預設值為None,mqttv5的訂閱屬性
'''
返回一個元組(result, mid),其中result是MQTT_ERR_SUCCESS表示成功,或者(MQTT_ERR_NO_CONN, None)如果使用者端當前未連線。mid是取消訂閱請求的訊息 ID。mid 值可用於通過檢查on_unsubscribe()回撥中的 mid 引數(如果已定義)來跟蹤取消訂閱請求。
如果主題為None或字串長度為零,或者不是字串或列表,則引發ValueError 。
回撥是為響應事件而呼叫的函數,使用回撥需要做兩件事情,1.建立回撥函數,2.將函數分配給回撥
使用者端連線、重新連線broker後,連線的回撥方法,當用戶端收到broker的CONNACK響應的時候,會觸發on_connect()回撥
on_connect(client, userdata, flags, rc)
'''
client:觸發回撥的使用者端範例
userdata:在Client()或者user_data_set()中設定的使用者資料
flags:一個包含來自代理的響應標誌的字典,僅使用 clean session 設定為 0。如果 clean session=0 的使用者端重新連線到它之前連線過 的代理,則此標誌指示代理是否仍然具有使用者端的對談資訊。如果為 1,對談仍然存在
rc:連線結果,0:連線成功 1:連線被拒絕 - 協定版本不正確 2:連線被拒絕 - 使用者端識別符號無效 3:連線被拒絕 - 伺服器不可用
4:連線被拒絕 - 使用者名稱或密碼錯誤 5:連線被拒絕 - 未授權 6-255:當前未使用
'''
使用範例
# 定義回撥的方法
def on_connect(client, userdata, flags, rc):
print("Connected with result code: " + str(rc))
# 設定使用者端的回撥on_connect屬性為定義的on_connect方法
client.on_connect = on_connect
當用戶端與代理斷開時觸發
on_disconnect(client, userdata, rc)
'''
client:觸發的使用者端範例
userdata:使用者資料
rc:斷開連線結果,如果是0,則回撥是為了響應disconnect()呼叫而呼叫的。如果有任何其他值,則斷開連線是意外的,例如可能由網路錯誤引起的
'''
使用範例
# 定義回撥函數
def on_disconnect(client, userdata, rc):
print("Unexpected disconnection %s" % rc)
# 設定使用者端的回撥on_disconnect屬性為定義的on_disconnect方法
client.on_disconnect = on_disconnect
當Clinet傳送訊息到broker,會觸發on_publish()回撥
on_publish(client, userdata, mid)
'''
mid:訊息ID
'''
使用範例
def on_publish(client, userdata, mid):
print(mid,'message publish')
'''
觸發該回撥
對於 QoS 級別 1 和 2 的訊息,意味著適當的握手已經完成
對於 QoS 0,這僅表示訊息已離開使用者端。mid變數匹配從相應的publish()呼叫返回的 mid 變數,以允許跟蹤傳出訊息。
這個回撥很重要,因為即使 publish() 呼叫返回成功,也並不總是意味著訊息已傳送
'''
on_message,當 client 接收到已訂閱的話題的訊息並且與message_callback_add自定義主題篩選不匹配的時候,會呼叫 on_message() 回撥函數
on_message(client, userdata, message)
'''
message:MQTTMessage類的範例。有topic, payload, qos, retain屬性
'''
使用範例
def on_message(client, userdata, message):
print("主題:" + msg.topic + " 訊息:"+ str(message.payload.decode("utf-8")))
client.on_message = on_message
message_callback_add,用於處理特定訂閱過濾器的傳入訊息,包括使用萬用字元,message_callback_add自定義是自定義的一個主題篩選過濾器,比如我用message_callback_add 篩選主題 A,有A的訊息觸發message_callback_add回撥,否則觸發on_message
message_callback_add(sub, callback)
'''
sub:此回撥進行匹配的訂閱篩選器
callback:要使用的回撥
'''
使用範例
# 自定義處理action主題的回撥
def action_message(client,userdata,message):
print('action topic:'+message.topic)
client.message_callback_add('topic',action_message)
如果想要刪除使用message_callback_add註冊的特定回撥
client.message_callback_remove('topic')
當代理確認訂閱時,將生成on_subscribe()回撥
on_subscribe(client, userdata, mid, granted_qos)
'''
granted_qos:一個整數列表,給出代理為每個不同訂閱請求授予的 QoS 級別
'''
使用範例
def on_subscribe(client, userdata, mid, granted_qos):
print(f"On Subscribed: qos ={granted_qos}")
client.on_subscribe = on_subscribe
當代理確認取消訂閱時,生成on_unsubscribe()回撥
def on_unsubscribe(client, userdata, mid):
print('un unsubscribe ' + userdata)
client.on_unsubscribe = on_unsubscribe
當用戶端有紀錄檔資訊時呼叫
def on_log(client, userdata, level, buf):
'''
level:訊息級別,MQTT_LOG_INFO、MQTT_LOG_NOTICE、MQTT_LOG_WARNING、MQTT_LOG_ERR和MQTT_LOG_DEBUG之一
buf:紀錄檔訊息
'''
# 可以與標準 Python 紀錄檔記錄同時使用,可以通過enable_logger方法啟用
print(buf)
client.on_log = on_log
當通訊端開啟時呼叫。使用它來向外部事件迴圈註冊通訊端以進行讀取
def on_socket_open(client, userdata, sock):
print(f'{socket} open')
client.on_socket_open = on_socket_open
當通訊端即將關閉時呼叫。使用它從外部事件迴圈中登出通訊端以進行讀取
def on_socket_close(client, userdata, sock):
print(f'{socket} close')
client.on_socket_close = on_socket_close
當對通訊端的寫操作失敗時呼叫,因為它會阻塞,例如輸出緩衝區已滿。使用它來向外部事件迴圈註冊通訊端以進行寫入
def on_socket_register_write(client, userdata, sock):
print(f'{socket} on_socket_register_write')
client.on_socket_register_write = on_socket_register_write
當對通訊端的寫操作在先前失敗後成功時呼叫。使用它從外部事件迴圈中登出通訊端以進行寫入。
def on_socket_unregister_write(client, userdata, sock):
print(f'{sock} on_socket_register_write')
client.on_socket_unregister_write = on_socket_unregister_write
# 迴圈讀取-當通訊端準備好讀取時呼叫
loop_read()
# 迴圈寫入-當通訊端準備好寫入時呼叫
loop_write()
# 每隔幾秒呼叫一次以處理訊息重試和 ping
loop_misc()
# 返回使用者端中使用的通訊端物件,以允許與其他事件迴圈進行互動,對於基於選擇的迴圈比較有用
socket()
# 如果有資料等待寫入,則返回 true,以允許將使用者端與其他事件迴圈連線起來,對於基於選擇的迴圈比較有用
want_write()
'''
回撥按以下順序調優
1. on_socket_open
2. on_socket_register_write - 零次或多次
3. on_socket_unregister_write - 零次或多次
4. on_socket_close
'''
使用者端模組提供了一些全域性輔助函數,對使用者端行為提供一些行為操作
# 匯入
import paho.mqtt.client as mqtt
import paho.mqtt.client as mqtt
#檢查主題與訂閱是否匹配
mqtt.topic_matches_sub(sub, topic)
# 返回與 CONNACK 結果關聯的錯誤字串,例如在連線回撥中傳入rc欄位輸出對應資訊
mqtt.connack_string(connack_code)
# 返回與 Paho MQTT 錯誤號關聯的錯誤字串
mqtt.error_string(mqtt_errno)
允許以一次性方式直接釋出訊息。在有一條/多條訊息要釋出到代理的情況下很有用,然後斷開連線而不需要其他任何東西,都對mqttv5協定支援,但是目前不允許在連線或者釋出訊息的時候設定屬性
向代理髮布一條訊息,然後乾淨地斷開連線
# 導包
from paho.mqtt.publish import single
# 引數
def single(topic, payload=None, qos=0, retain=False, hostname="localhost",
port=1883, client_id="", keepalive=60, will=None, auth=None,
tls=None, protocol=paho.MQTTv311, transport="tcp", proxy_args=None):
'''
topci:主題
payload:內容
qos:服務質量
retain:是否設定保留訊息
hostname:/
port:/
client_id:要使用的 MQTT 使用者端 ID。如果為「」或 None,Paho 庫將自動生成一個使用者端 ID
keepalive:心跳間隔
will:包含使用者端遺囑引數的字典,{'topic': 'topic', 'qos':1},主題是必須的,其他引數都是可選的
auth:包含使用者端身份驗證引數的字典,{'username':'liqi','password':'123456'}
預設為無,表示不使用身份驗證,如果設定的話使用者名稱是必需的,密碼是可選的
tls:包含使用者端 TLS 設定引數的字典,{'ca_certs':'ca_certs'}
ca_certs 是必需的,所有其他引數都是可選的,與tls_set方法引數一致
protocol:mqtt協定版本
transport:預設tcp,設定為「websockets」以通過 WebSockets 傳送 MQTT
proxy_args:設定MQTT連線代理。啟用對SOCKS或HTTP代理引數是字典型別
{'proxy_type':'代理型別,必選,socks.HTTP, socks.SOCKS4, or socks.SOCKS5',
'proxy_addr':'代理伺服器的IP地址或DNS名稱,必選',
'proxy_rdns':'bool,可選,是否應該執行代理查詢,遠端(True,預設)或本地(False)',
'proxy_username':'可選,SOCKS5代理的使用者名稱,SOCKS4代理的使用者id',
'proxy_password':'可選,SOCKS5代理密碼'
}
該引數對應的使用方法表示在connect() or connect_async()之前使用
'''
向代理髮布多條訊息,然後完全斷開連線
# 導包
from paho.mqtt.publish import multiple
# 引數
def multiple(msgs, hostname="localhost", port=1883, client_id="", keepalive=60,
will=None, auth=None, tls=None, protocol=paho.MQTTv311,
transport="tcp", proxy_args=None):
'''
msgs:要釋出的訊息列表。每個訊息要麼是一個字典,要麼是一個元組,如果是字典,則只有主題必須存在。預設值將用於任何缺少的引數
如果是元組,引數按欄位順序對應
其他引數與single一致
'''
允許直接訂閱和處理訊息,都包括對 MQTT v5.0 的支援,但目前不允許在連線或訂閱時設定任何屬性
訂閱一組主題並返回收到的訊息。這是一個阻塞函數
# 導包
from paho.mqtt.subscribe import simple
# 引數
def simple(topics, qos=0, msg_count=1, retained=True, hostname="localhost",
port=1883, client_id="", keepalive=60, will=None, auth=None,
tls=None, protocol=paho.MQTTv311, transport="tcp",
clean_session=True, proxy_args=None):
'''
topics:是使用者端將訂閱的主題字串。如果訂閱多個主題,可以是字串或字串列表
msg_count:訊息計數,從代理檢索的訊息數。預設為 1。如果為 1,將返回單個 MQTTMessage 物件。如果 >1,將返回 MQTTMessages 列表
retained:設定為 True 以保留訊息,設定為 False 以忽略設定了保留標誌的訊息
clean_session:對談設定
其他引數參考single
'''
使用範例
msg = simple("my/test")
rint("%s %s" % (msg.topic, msg.payload))
訂閱一組主題並使用使用者提供的回撥處理收到的訊息。
# 導包
from paho.mqtt.subscribe import callback
# 引數
def callback(callback, topics, qos=0, userdata=None, hostname="localhost",
port=1883, client_id="", keepalive=60, will=None, auth=None,
tls=None, protocol=paho.MQTTv311, transport="tcp",
clean_session=True, proxy_args=None):
'''
callback:回撥,定義一個回撥,將用於收到的每條訊息,形式與on_message類似
userdata:使用者資料,將在收到訊息時傳遞給 on_message 回撥
其他引數參考simple
'''
使用範例
# 定義一個回撥
def on_message_print(client, userdata, message)
print("%s %s" % (message.topic, message.payload))
callback(callback=on_message_print, topics="my/test")