Python MQTT使用者端 paho-mqtt

2023-05-15 18:01:00

Python中MQTT

Python有許多優秀的MQTT使用者端,比較有代表性的有paho-mqtt、hbmqtt、gmqtt等,各有特色

  • paho-mqtt 有著最優秀的檔案,程式碼風格易於理解,同時有著強大的基金會支援,目前新版本支援 MQTT 5.0

  • hbmqtt 使用 asyncio 庫實現,可以優化網路 I/O 帶來的延遲,但是程式碼風格不友好,檔案較少,不支援 MQTT 5.0,且不再維護,被原作者棄用,有一個分支amqtt正在由不同的人積極開發

  • gmqtt 同樣通過 asyncio 庫實現,相比 HBMQTT ,程式碼風格友好,最重要的是支援 MQTT 5.0

    paho-mqtt 可以說是 Python MQTT 開源使用者端庫中的佼佼者, 支援5.0、3.1.1 和 3.1 的MQTT 協定,由 Eclipse 基金會主導開發.在基金會的支援下,以每年一個版本的速度更新,穩定、持續,本文使用新版本paho-mqtt v1.6.1

    pypi地址

    開源地址

    參考文章


paho-mqtt安裝

使用pip安裝

pip3 install paho-mqtt
# virtualenv安裝和完整程式碼參考官方檔案

paho-mqtt已知的一些限制

截止1.6.1版本,當 clean_session 為 False 時,session 只儲存在記憶體中,不持久化。這意味著當用戶端重新啟動時(不僅僅是重新連線,通常是因為程式重新啟動而重新建立物件)對談丟失。這可能導致訊息丟失。

使用者端對談的以下部分丟失:

  • 已從伺服器收到但尚未完全確認的 QoS 2 訊息。

    由於使用者端將盲目地確認任何 PUBCOMP(QoS 2 事務的最後一條訊息),它不會掛起但會丟失此 QoS 2 訊息。

  • 已傳送到伺服器但尚未完全確認的 QoS 1 和 QoS 2 訊息。

    這意味著傳遞給 publish() 的訊息可能會丟失。這可以通過注意傳遞給 publish() 的所有訊息都有相應的 on_publish() 呼叫來緩解。

    這也意味著代理可能在對談中有 Qos2 訊息。由於使用者端從一個空對談開始,它不知道它並將重新使用中間。這還沒有確定。

此外,當 clean_session 為 True 時,該庫將在網路重新連線時重新發布 QoS > 0 訊息。這意味著 QoS > 0 訊息不會丟失。但是標準規定我們是否應該丟棄傳送了釋出封包的任何訊息。我們的選擇意味著我們不符合標準,並且 QoS 2 有可能被接收兩次。如果只需要一次交付的 QoS 2 保證,應該 clean_session = False

paho-mqtt使用

使用paho-mqtt實現使用者端相關功能簡單步驟如下:

  • 構造Client使用者端範例
  • 使用connect相關方法將建立的使用者端連線到代理
  • 使用loop相關方法維護和broker的通訊
  • 使用subscribe()方法訂閱主題、接收訊息
  • 使用publish()方法傳送訊息
  • 使用disconnect()斷開連線


Client使用者端

使用使用者端連線代理、訂閱等,首先我們需要先建立一個使用者端,paho-mqtt使用Client()建立使用者端範例

Client類的構造引數
# Client 原始碼 引數如下
def __init__(self, client_id="", clean_session=None, userdata=None,
           protocol=MQTTv311, transport="tcp", reconnect_on_failure=True):

Client類構造引數講解
# 引數示意
client_id = '使用者端ID,str型別,可自定義'
'''
如果長度為零或者為空,則會隨機生成一個,這種情況下,clean_session引數必須為True清除對談,因為持久對談需要client_id為唯一標識來恢復對談
'''

clean_session = '對談清除狀態,bool型別,設為True,broker在斷開連線的時候刪除該client的資訊,為False,則相當於是持久對談'
'''
clean_session在官方檔案範例預設值為True,檢視1.6.1版本原始碼預設值為None,看邏輯應該是支援mqttv5之後做的的變化
原始碼中判斷如果protocol是MQTT V5版本 clean_session不是None的話則丟擲ValueError,就是mqtt5的clean_session必須持久對談,
如果不是mqtt5的話如果clean_session is None 則將clean_session設定為True,這塊就與官方檔案的預設邏輯對應上了
'''

userdata = 'client使用者資料,傳遞給回撥函數,可以是任意型別,可以使用Clinet的 user_data_set()函數進行更新資料'

protocol = '使用者端協定的版本,預設是MQTTv311就是3.1.1版本,也可以是MQTTv31、MQTTv5版本'
'''
protocol的引數在原始碼中是以下對應關係,理論上直接傳入對應int值或者匯入MQTTv** 欄位傳入都可
MQTTv31 = 3
MQTTv311 = 4
MQTTv5 = 5
'''

transport = '底層傳輸協定,預設是使用原始tcp,值可以設定為websockets通過ws傳送mqtt'

reconnect_on_failure = 'bool型別, 連線失敗後是否自動重新connect,預設為True'
'''
官方檔案沒有reconnect_on_failure相關範例和解釋,不確定老版本有沒有該欄位
'''

建立使用者端
# -*- coding: utf-8 -*-
# @Time:     2023/5/10 16:09
# @Author:   LiQi
# @Describe:

import paho.mqtt.client as mqtt  # 匯入clinet 別名 mqtt

# 建立一個使用者端範例賦值client,client_id自定義,其他引數根據需要設定
client = mqtt.Client(client_id='muziqi')

重置使用者端
'''paho-mqtt提供reinitialise方法重新初始化已經構造好的使用者端'''
# 上面建立的使用者端clinet,直接呼叫reinitialise
client.reinitialise()
'''
reinitialise方法擁有下面三個引數
client_id="", clean_session=True, userdata=None
'''

Clinet選項函數

選項函數的作用是給構造好的使用者端新增附加選項,一般情況下在使用content連線broker前完成,比如設定Client的證書、回撥、賬號密碼等,根據具體業務使用,設定完成後使用conten連線到broker


max_inflight_messages_set

當QoS> 0的時候,可以存在的部分完成(已經傳送,還沒有確認成功的訊息)的訊息的最大數量,這些訊息可以同時通過網路流,預設為20.增加此值將消耗更多記憶體,但可以增加吞吐量

# inflight引數為要修改的數量,最小為1,小於1則丟擲ValueError
client.max_inflight_messages_set(inflight)

max_queued_messages_set

設定傳出訊息佇列中等待處理的QoS> 0的傳出訊息的最大數量,預設為0表示無限制,當佇列已滿時,其他傳出的訊息都將被丟棄,實際使用中0實際上並不是無限,目前實現最大可為65555,包括佇列中的65535條訊息+inflight的預設20條訊息,如果inflight預設值被訊息,佇列中的實際訊息數量被減少

# queue_size是要修改的最大數量
client.max_queued_messages_set(queue_size)

message_retry_set-廢棄

QoS>0 的訊息,如果傳送之後超過一定時間broker沒有響應,重發訊息的時間,單位是s,預設5s

 '''
 原始碼註釋該方法不再使用,在2.0版本刪除
 '''

ws_set_options

設定WebSocket的連線選項,構造Client時transport="websockets"才會使用,connect相關方法之前呼叫

import paho.mqtt.client as mqtt

# 傳輸協定設定為ws
client = mqtt.Client(client_id='muziqi',transport='websockets')

'''
path:broker的mqtt 路徑,以/開頭的字串
headers:頭部資訊可以是字典或者是可呼叫物件。如果是字典的話字典中額外的項被新增到websocket頭中。如果是一個可呼叫的物件,預設的websocket的頭部資訊被傳遞到這個函數,將函數結果當做新的頭不新鮮'''
client.ws_set_options(path="/mqtt", headers=None)

tls_set

設定網路加密和身份驗證選項,啟用SSL/TLS支援,connect相關方法之前呼叫

# 以下所有引數預設值均為None,根據業務自身選用
client.tls_set(ca_certs, certfile, keyfile, cert_reqs, tls_version, ciphers, keyfile_password)
'''
ca_certs:  CA 根證書的路徑,如果這是給定的唯一選項,則使用者端將以與 Web 瀏覽器類似的方式執行,要求代理擁有由ca_certs中的證書頒發機構簽名的證書,並將使用 TLS v1.2 進行通訊,但不會嘗試任何形式的身份驗證。這提供了基本的網路加密,具體取決於代理的設定方式。預設情況下,在 Python 2.7.9+ 或 3.4+ 上,使用系統的預設證書頒發機構。在較舊的 Python 版本上,此引數是必需的

certfile和keyfile: 使用者端私鑰和證書的路徑,分別指向 PEM 編碼的使用者端證書和私鑰的字串,如果這些引數不是None那麼它們將被用作基於 TLS 的身份驗證的使用者端資訊,對此功能的支援取決於代理

cert_reqs: 設定使用者端對 broker 證書的需求,預設是 ssl.CERT_REQUIRED ,表示 broker 必須提供一個證書

tls_version:使用的SSL/TLS協定版本,預設是TLS v1.2 一般不做修改

ciphers:字串,設定允許使用哪些加密方法,預設是None

keyfile_password:如果certfile和keyfile加密了需要密碼解密,可以使用該引數傳遞,如果不提供的話需要在終端輸入,目前無法定義回撥以提供密碼

'''

tls_set_context

設定網路加密和身份驗證上下文,啟用SSL/TLS支援,connect相關方法之前呼叫

client.tls_set_context(context)

'''
引數釋義
ssl.SSLContext物件,在python3.4之後,預設情況下由ssl.create_default_context()提供,一般無需自定義設定,業務需要不確定是否使用該方法的話,用tls_set or 預設上下文即可
'''

tls_insecure_set

設定對伺服器證書中的伺服器主機名進行驗證,在connect相關方法之前和 tls_set 或 tls_set_context之後呼叫

client.tls_insecure_set(value)
'''
value是bool型別,設定為True代幣不使用加密,Flase使用加密
值設定為True,可能讓惡意第三方通過 DNS 欺騙等方式冒充你的伺服器
測試程式可以使用該方法來不進行驗證
'''
# 原始碼根據tls_set選項的cert_reqs欄位來設定預設值
if cert_reqs != ssl.CERT_NONE:

    self.tls_insecure_set(False)
else:

    self.tls_insecure_set(True)

enable_logger

使用 python 紀錄檔包 logging啟用紀錄檔記錄,可以跟後面的on_log紀錄檔回撥方法同時使用

client.enable_logger(logger)
'''
logger引數用來指定記錄器
如果指定了記錄器,那麼將使用該logging.Logger物件,否則將自動建立一個
'''

Paho紀錄檔記錄級別和標準紀錄檔級別對應關係如下


disable_logger

禁用python 紀錄檔包 記錄紀錄檔,對on_log回撥沒有影響

client.disable_logger()
'''
disable_logger相當於是把enable_logger記錄器清除
兩個方法的原始碼也很簡單 
		↓↓↓
'''
def enable_logger(self, logger=None):
    if logger is None:
        if self._logger is not None:
            return
        logger = logging.getLogger(__name__)
    self._logger = logger

def disable_logger(self):
    self._logger = None

username_pw_set

設定使用者名稱和可選的代理身份驗證密碼,密碼根據broker驗證進行設定,可為空,在connect相關方法前呼叫

client.username_pw_set(username,password=None)

user_data_set

將在生成事件時傳遞給回撥方法的私有使用者資料,可以使用這個方法更新構造Client時候的userdata欄位值

client.user_data_set(userdata)

will_set

設定遺囑訊息,client沒有使用disconnect方法以外斷開連線,broker推播遺囑訊息

client.will_set(topic,payload,qos,retain,properties)
'''
topic:遺囑釋出主題
payload:預設值為None,遺囑訊息內容
qos:預設值為0,訊息質量級別
retain:bool,預設False,是否將遺囑訊息設定為保留訊息
properties:預設值為None,支援MQTTv5後新增的欄位,用來設定遺囑屬性
'''

遺囑訊息邏輯、報文、上述欄位、屬性可參考https://www.cnblogs.com/Mickey-7/p/17385599.html>


reconnect_delay_set

使用者端斷開重新連線延遲的設定

client.reconnect_delay_set(min_delay,max_delay)
'''
min_delay:預設1
max_delay:預設值120
連線丟失的時候,預設等待min_delay,每次嘗試重連後下次min_delay都將翻倍,上限時間是max_delay
連線成功將重置min_delay,Client收到broker的connack報文視為完全連線成功
'''

Client連線/重新連線/斷開連線

連線方法-connect

connect方法的作用是將構造好、設定好選項的的client連線到broker,這是一個阻塞函數

client.connect(host, port, keepalive, bind_address, bind_port,clean_start,properties)

'''
host:遠端代理的主機名或 IP 地址

prot:要連線的伺服器主機的埠,預設為1883,如果是基於SSL/TLS的MQTT的預設埠為8883,如果選項函數使用tls_set或者tls_set_context,可能需要手動提供埠

keepalive:心跳間隔時間,預設為60,單位是秒

bind_address: 預設值是空字串,如果client的本地有多個地址,可以使用該引數繫結一個地址,表示來源

bind_port:預設值是0, 與bind_address一樣,本地有多個地址,可以使用該引數繫結一個埠,表示來源

clean_start:對談設定,預設值 MQTT_CLEAN_START_FIRST_ONLY = 3, mqtt v5版本僅在第一次成功連線時使用乾淨啟動標誌,mqttv5如							果不是3則ValueError

properties:設定對談屬性
'''

clean_start和properties參考:https://www.cnblogs.com/Mickey-7/p/17361919.html


連線方法-connect_async

非同步連線方法,與loop_start結合使用以及非阻塞的方式連線,如果一直沒有呼叫loop_start,不會完成連線

client.connect_async(host, port, keepalive, bind_address, bind_port,clean_start,properties)
'''
引數參考connect方法
'''

連線方法-connect_srv

DNS連線,使用 SRV DNS查詢連線到代理以獲取代理地址

client.connect_srv(domain, keepalive, bind_address,clean_start, properties)

'''
domain:用於搜尋SRV記錄的DNS域;如果沒有則嘗試本地域名
其餘引數參考connect方法

'''
重新連線-reconnect

在使用connect相關方法連線過之後,想要重新連線可以使用reconnect方法,完全複用之前connect相關的引數進行連線

client.reconnect()
斷開連線-disconnect

主動徹底斷開和broker的連線,呼叫該方法不會等待排隊的訊息被傳送

client.disconnect()

網路迴圈

網路迴圈的函數有四種,執行在後臺,處理收發的訊息,如果不使用的話,傳入的資料不會被處理,傳出的資料不會傳送

loop

定期呼叫用以處理訊息,通過 select() 函數阻塞,直到有訊息需要收發,並根據訊息型別觸發對應的回撥函數

run_status = True:
while run_status:
	clinet.loop(timeout,max_packets)
  
'''
timeout:預設值1.0,阻塞超時時間,單位S,不能超過心跳時間keepalive,否則client會定時從broker 斷開
max_packets:預設值是1,不用設定,已經過時不使用
'''

loop_start / loop_stop

實現loop的呼叫和停止,在connect相關方法之前或之後呼叫loop_start,會在後臺執行一個執行緒欄位呼叫loop,無需編碼實現loop迴圈,

每個使用者端必須有一個loop迴圈

# 在connect相關方法之前或之後呼叫,會在後臺執行一個執行緒呼叫loop,連線中斷的時候會自動嘗試重新連線,無需編碼實現loop迴圈
client.loop_start()
# 停止loop迴圈的後臺執行緒
client.loop_stop()


loop_forever

網路迴圈的阻塞形式,loop_forever 呼叫會阻塞主執行緒,永遠不會停止,直到使用者端呼叫disconnect,會自動重新連線

client.loop_forever(timeout,max_packets,retry_first_connection)
'''
retry_first_connection:bool,預設False,在第一次連線嘗試失敗後是否應該進行重試,與reconnect_on_failure不同,它隻影響第一次連線,如果設定為True,當第一次連線失敗時,paho-mqtt會丟擲OSError/WebsocketConnectionError異常
'''
loop和loop_forever的區別
  • loop方法:預設呼叫一下只回圈一次,然後返回。這意味著它只會執行一次通訊迴圈,不使用start方法的話需要無限while 並且在完成後將返回到呼叫程式。

  • loop_forever方法:它是一個無限迴圈,如果沒有意外情況,將一直保持執行。 它在連線丟失或出現其他錯誤時會自動重新連線。因此,它可以用於長期執行的應用程式,以保持MQTT使用者端連線

loop_start和loop_forever的區別
  • loop_start函數是一個非阻塞的函數,可以啟動一個執行緒來處理MQTT使用者端的網路通訊和事件處理。該函數返回後,MQTT使用者端將會在後臺執行緒中執行,不會阻塞當前執行緒。在呼叫loop_start函數後,可以分別呼叫connect、publish、subscribe等方法來進行MQTT通訊。但是在使用完MQTT使用者端後,需要呼叫loop_stop函數來停止後臺執行緒

  • loop_forever函數是一個阻塞的函數,可以啟動一個執行緒來處理MQTT使用者端的網路通訊和事件處理。該函數會一直阻塞當前執行緒,直到MQTT使用者端接收到disconnect訊息或者呼叫了disconnect函數。在呼叫loop_forever函數後,不能再使用connect、publish、subscribe等方法來進行MQTT通訊,因為當前執行緒已經被阻塞。只有在呼叫了disconnect函數後,才會解除阻塞

  • 因此,loop_start和loop_forever的區別在於是否阻塞當前執行緒。如果需要在MQTT使用者端後臺執行並且繼續使用當前執行緒進行其他操作,可以使用loop_start函數;如果需要一直阻塞當前執行緒直到MQTT使用者端退出,可以使用loop_forever函數


釋出訊息
publish

使用者端傳送訊息到broker,broker將訊息轉發到訂閱匹配主題的使用者端

client.publish(topic, payload, qos, retain, properties)

'''
topic:訊息釋出的主題
payload:預設None,要傳送的實際訊息,如果沒有或者為None,將傳送長度為0的訊息,payload傳遞int或者float會被預設轉換為str
        傳送真正型別的int、float需要使用struct.pack()建立 
qps:訊息之類預設0
retain:是否設定保留訊息
properties:設定保留訊息的屬性
'''
釋出訊息的返回

訊息傳送之後會返回一個MQTTMessageInfo物件,有以下屬性和方法

  • rc:釋出的結果。可以是MQTT_ERR_SUCCESS表示成功,MQTT_ERR_NO_CONN表示使用者端當前未連線,或者當使用 max_queued_messages_set時,MQTT_ERR_QUEUE_SIZE表示訊息既沒有排隊也沒有傳送
  • mid:釋出請求的訊息ID。 mid值可以通過檢查on_publish()回撥中的mid引數來跟蹤釋出請求,如果已定義,更容易使用wait_for_publish
  • wait_for_publish() :將阻塞直到訊息被髮布。如果訊息未排隊(rc == MQTT_ERR_QUEUE_SIZE),將引發 ValueError,如果釋出時出現錯誤,則引發 RuntimeError,很可能是由於使用者端未連線
  • is_published:如果訊息已釋出返回True。如果訊息未排隊(rc == MQTT_ERR_QUEUE_SIZE),它將引發ValueError,如果釋出時出現錯誤,則可能是由於使用者端未連線而引發RuntimeErro

如果主題為None,長度為零或無效(包含萬用字元),如果qos不是0、1 或 2 之一,或者有效負載的長度大於 268435455 位元組,則會引發ValueError


訂閱、退訂主題
subscribe

使用者端訂閱主題方法,可以通過3種方式訂閱,mqttv5還有3種方式

client.subscribe(topic, qos, options, properties)

topic = '訂閱的主題'
qos = '服務質量預設為0'
options:'訂閱選項,mqtt v5 使用'
'''
訂閱的選項必須是SubscribeOptions物件,位於 from paho.mqtt.subscribeoptions import SubscribeOptions
SubscribeOptions的構造引數如下

qos:與MQTT v3.1.1中相同。

noLocal:True或False。如果設定為True,則訂閱者不會收到自己的釋出。

retainAsPublished:True或False。如果設定為True,則收到的釋出的retain標誌將按釋出者設定。

retainHandling:RETAIN_SEND_ON_SUBSCRIBE,RETAIN_SEND_IF_NEW_SUB或RETAIN_DO_NOT_SEND
                控制代理何時傳送保留訊息:
                RETAIN_SEND_ON_SUBSCRIBE:在任何成功的訂閱請求上
                RETAIN_SEND_IF_NEW_SUB:僅在訂閱請求是新的情況下
                RETAIN_DO_NOT_SEND:從不傳送保留訊息

'''



properties = '設定MQTT v5.0屬性的properties範例,可選-如果不設定,則不傳送任何屬性'

字串和整數訂閱

client.subscribe('my/topic',2)
'''
topic:指定要訂閱的訂閱主題的字串。
qos:訂閱所需的服務質量級別,預設為0。
選項和屬性:未使用
'''

字串和整數的元組

client.subscribe(('my/topic',1))
'''
topic: (topic, qos)的元組
qos、選項和屬性:未使用
'''

字串和整數元組的列表

client.subscribe([('my/topcic',0),('my/topic2',1)])
'''
這允許在單個 SUBSCRIPTION 命令中進行多個主題訂閱,這比使用多次呼叫subscribe()更有效
(topic, qos)的元組列表。topic 和 qos 都必須存在於所有元組中
qos、選項和屬性:未使用
'''

字串和訂閱選項(僅MQTT v5.0)

from paho.mqtt.subscribeoptions import SubscribeOptions

client.subscribe('my/topic',options=SubscribeOptions(qos=2))
'''
topic:訂閱主題。
qos:未使用。
選項:MQTT v5.0訂閱選項。
properties:屬性未設定
'''

字串和訂閱選項元組(僅MQTT v5.0)

from paho.mqtt.subscribeoptions import SubscribeOptions

client.subscribe(('my/topic', SubscribeOptions(qos=1)))
'''
topic: (topic, SubscribeOptions)的元組。主題和訂閱選項必須出現在元組中
qos、options、roperties未使用
'''

字串和訂閱選項元組列表(僅MQTT v5.0)

from paho.mqtt.subscribeoptions import SubscribeOptions
client.subscribe([('my/topic', SubscribeOptions(qos=1)),('my/topic2', SubscribeOptions(qos=2))])
'''
允許在單個SUBSCRIPTION中進行多個主題訂閱命令,比使用多個呼叫更有效
topic:格式元組(topic, SubscribeOptions)的列表。主題和訂閱選項必須出現在所有元組中。
qos和options:未使用。
properties:未設定
'''

訂閱函數的返回

返回一個元組(result, mid),其中result為MQTT_ERR_SUCCESS表示成功或(MQTT_ERR_NO_CONN, None)使用者端當前未連線。

mid是訂閱請求的訊息 ID。mid 值可用於通過檢查on_subscribe()回撥中的 mid 引數(如果已定義)來跟蹤訂閱請求。

如果qos不是 0、1 或 2,或者主題為None或字串長度為零,或者主題不是字串、元組或列表,則引發ValueError


unsubscribe

取消訂閱一個或多個主題

client.unsubscribe(topic,properties)
'''
topic:一個主題字串或主題字串列表,取消訂閱的訂閱主題
properties:預設值為None,mqttv5的訂閱屬性
'''
取消訂閱函數的返回

返回一個元組(result, mid),其中result是MQTT_ERR_SUCCESS表示成功,或者(MQTT_ERR_NO_CONN, None)如果使用者端當前未連線。mid是取消訂閱請求的訊息 ID。mid 值可用於通過檢查on_unsubscribe()回撥中的 mid 引數(如果已定義)來跟蹤取消訂閱請求。

如果主題為None或字串長度為零,或者不是字串或列表,則引發ValueError 。

回撥方法

回撥是為響應事件而呼叫的函數,使用回撥需要做兩件事情,1.建立回撥函數,2.將函數分配給回撥

on_connect

使用者端連線、重新連線broker後,連線的回撥方法,當用戶端收到broker的CONNACK響應的時候,會觸發on_connect()回撥

on_connect(client, userdata, flags, rc)
'''
client:觸發回撥的使用者端範例
userdata:在Client()或者user_data_set()中設定的使用者資料
flags:一個包含來自代理的響應標誌的字典,僅使用 clean session 設定為 0。如果 clean session=0 的使用者端重新連線到它之前連線過             			的代理,則此標誌指示代理是否仍然具有使用者端的對談資訊。如果為 1,對談仍然存在
rc:連線結果,0:連線成功 1:連線被拒絕 - 協定版本不正確 2:連線被拒絕 - 使用者端識別符號無效 3:連線被拒絕 - 伺服器不可用 
					4:連線被拒絕 - 使用者名稱或密碼錯誤 5:連線被拒絕 - 未授權 6-255:當前未使用

'''

使用範例

# 定義回撥的方法  
def on_connect(client, userdata, flags, rc):
     print("Connected with result code: " + str(rc))

# 設定使用者端的回撥on_connect屬性為定義的on_connect方法
client.on_connect = on_connect

on_disconnect

當用戶端與代理斷開時觸發

on_disconnect(client, userdata, rc)
'''
client:觸發的使用者端範例
userdata:使用者資料
rc:斷開連線結果,如果是0,則回撥是為了響應disconnect()呼叫而呼叫的。如果有任何其他值,則斷開連線是意外的,例如可能由網路錯誤引起的
'''

使用範例

# 定義回撥函數
def on_disconnect(client, userdata, rc):
  
    print("Unexpected disconnection %s" % rc)
    
# 設定使用者端的回撥on_disconnect屬性為定義的on_disconnect方法
client.on_disconnect = on_disconnect

on_publish

當Clinet傳送訊息到broker,會觸發on_publish()回撥

on_publish(client, userdata, mid)
'''
mid:訊息ID
'''

使用範例

def on_publish(client, userdata, mid):
  	print(mid,'message publish')
'''
觸發該回撥
對於 QoS 級別 1 和 2 的訊息,意味著適當的握手已經完成
對於 QoS 0,這僅表示訊息已離開使用者端。mid變數匹配從相應的publish()呼叫返回的 mid 變數,以允許跟蹤傳出訊息。
這個回撥很重要,因為即使 publish() 呼叫返回成功,也並不總是意味著訊息已傳送
'''

on_message 和 message_callback_add

on_message,當 client 接收到已訂閱的話題的訊息並且與message_callback_add自定義主題篩選不匹配的時候,會呼叫 on_message() 回撥函數

on_message(client, userdata, message)
'''
message:MQTTMessage類的範例。有topic, payload, qos, retain屬性
'''

使用範例

def on_message(client, userdata, message):
    print("主題:" + msg.topic + " 訊息:"+ str(message.payload.decode("utf-8")))
                                          
client.on_message = on_message

message_callback_add,用於處理特定訂閱過濾器的傳入訊息,包括使用萬用字元,message_callback_add自定義是自定義的一個主題篩選過濾器,比如我用message_callback_add 篩選主題 A,有A的訊息觸發message_callback_add回撥,否則觸發on_message

message_callback_add(sub, callback)
'''
sub:此回撥進行匹配的訂閱篩選器
callback:要使用的回撥
'''

使用範例

# 自定義處理action主題的回撥
def action_message(client,userdata,message):
		print('action topic:'+message.topic)

client.message_callback_add('topic',action_message)

如果想要刪除使用message_callback_add註冊的特定回撥

client.message_callback_remove('topic')

on_subscribe

當代理確認訂閱時,將生成on_subscribe()回撥

on_subscribe(client, userdata, mid, granted_qos)
'''
granted_qos:一個整數列表,給出代理為每個不同訂閱請求授予的 QoS 級別
'''

使用範例

def on_subscribe(client, userdata, mid, granted_qos):
		print(f"On Subscribed: qos ={granted_qos}")
    
client.on_subscribe = on_subscribe

on_unsubscribe

當代理確認取消訂閱時,生成on_unsubscribe()回撥

def on_unsubscribe(client, userdata, mid):
  print('un unsubscribe ' + userdata)
  
client.on_unsubscribe = on_unsubscribe

on_log

當用戶端有紀錄檔資訊時呼叫

def on_log(client, userdata, level, buf):
    '''
    level:訊息級別,MQTT_LOG_INFO、MQTT_LOG_NOTICE、MQTT_LOG_WARNING、MQTT_LOG_ERR和MQTT_LOG_DEBUG之一
    buf:紀錄檔訊息
    '''
    # 可以與標準 Python 紀錄檔記錄同時使用,可以通過enable_logger方法啟用
    print(buf)
  
client.on_log = on_log

on_socket_open

當通訊端開啟時呼叫。使用它來向外部事件迴圈註冊通訊端以進行讀取

def on_socket_open(client, userdata, sock):
		print(f'{socket} open')

client.on_socket_open = on_socket_open

on_socket_close

當通訊端即將關閉時呼叫。使用它從外部事件迴圈中登出通訊端以進行讀取

def on_socket_close(client, userdata, sock):
				print(f'{socket} close')

client.on_socket_close = on_socket_close

on_socket_register_write

當對通訊端的寫操作失敗時呼叫,因為它會阻塞,例如輸出緩衝區已滿。使用它來向外部事件迴圈註冊通訊端以進行寫入

def on_socket_register_write(client, userdata, sock):
  	print(f'{socket} on_socket_register_write')
  
client.on_socket_register_write = on_socket_register_write

on_socket_unregister_write

當對通訊端的寫操作在先前失敗後成功時呼叫。使用它從外部事件迴圈中登出通訊端以進行寫入。

def on_socket_unregister_write(client, userdata, sock):
  	print(f'{sock} on_socket_register_write')

client.on_socket_unregister_write = on_socket_unregister_write

外包事件迴圈支援
# 迴圈讀取-當通訊端準備好讀取時呼叫
loop_read()

# 迴圈寫入-當通訊端準備好寫入時呼叫
loop_write()

# 每隔幾秒呼叫一次以處理訊息重試和 ping
loop_misc()

# 返回使用者端中使用的通訊端物件,以允許與其他事件迴圈進行互動,對於基於選擇的迴圈比較有用
socket()

# 如果有資料等待寫入,則返回 true,以允許將使用者端與其他事件迴圈連線起來,對於基於選擇的迴圈比較有用
want_write()

'''
回撥按以下順序調優
1. on_socket_open 
2. on_socket_register_write - 零次或多次
3. on_socket_unregister_write - 零次或多次
4. on_socket_close
'''

全域性輔助函數

使用者端模組提供了一些全域性輔助函數,對使用者端行為提供一些行為操作


檢查輔助函數
# 匯入
import paho.mqtt.client as mqtt
topic_matches_sub
import paho.mqtt.client as mqtt
#檢查主題與訂閱是否匹配
mqtt.topic_matches_sub(sub, topic)

connack_string
# 返回與 CONNACK 結果關聯的錯誤字串,例如在連線回撥中傳入rc欄位輸出對應資訊
mqtt.connack_string(connack_code)

error_string
# 返回與 Paho MQTT 錯誤號關聯的錯誤字串
mqtt.error_string(mqtt_errno)

釋出輔助函數

允許以一次性方式直接釋出訊息。在有一條/多條訊息要釋出到代理的情況下很有用,然後斷開連線而不需要其他任何東西,都對mqttv5協定支援,但是目前不允許在連線或者釋出訊息的時候設定屬性

single

向代理髮布一條訊息,然後乾淨地斷開連線

# 導包
from paho.mqtt.publish import single

# 引數
def single(topic, payload=None, qos=0, retain=False, hostname="localhost",
           port=1883, client_id="", keepalive=60, will=None, auth=None,
           tls=None, protocol=paho.MQTTv311, transport="tcp", proxy_args=None):
  
'''
topci:主題

payload:內容

qos:服務質量

retain:是否設定保留訊息

hostname:/
port:/

client_id:要使用的 MQTT 使用者端 ID。如果為「」或 None,Paho 庫將自動生成一個使用者端 ID

keepalive:心跳間隔
will:包含使用者端遺囑引數的字典,{'topic': 'topic',  'qos':1},主題是必須的,其他引數都是可選的

auth:包含使用者端身份驗證引數的字典,{'username':'liqi','password':'123456'}
		 預設為無,表示不使用身份驗證,如果設定的話使用者名稱是必需的,密碼是可選的
		 
tls:包含使用者端 TLS 設定引數的字典,{'ca_certs':'ca_certs'}
    ca_certs 是必需的,所有其他引數都是可選的,與tls_set方法引數一致
    
protocol:mqtt協定版本

transport:預設tcp,設定為「websockets」以通過 WebSockets 傳送 MQTT

proxy_args:設定MQTT連線代理。啟用對SOCKS或HTTP代理引數是字典型別
					 {'proxy_type':'代理型別,必選,socks.HTTP, socks.SOCKS4, or socks.SOCKS5',
					  'proxy_addr':'代理伺服器的IP地址或DNS名稱,必選',
					  'proxy_rdns':'bool,可選,是否應該執行代理查詢,遠端(True,預設)或本地(False)',
					  'proxy_username':'可選,SOCKS5代理的使用者名稱,SOCKS4代理的使用者id',
					  'proxy_password':'可選,SOCKS5代理密碼'
					 }
					 該引數對應的使用方法表示在connect() or connect_async()之前使用
				 	   
'''

multiple

向代理髮布多條訊息,然後完全斷開連線

# 導包
from paho.mqtt.publish import multiple

# 引數
def multiple(msgs, hostname="localhost", port=1883, client_id="", keepalive=60,
             will=None, auth=None, tls=None, protocol=paho.MQTTv311,
             transport="tcp", proxy_args=None):

'''
msgs:要釋出的訊息列表。每個訊息要麼是一個字典,要麼是一個元組,如果是字典,則只有主題必須存在。預設值將用於任何缺少的引數
		 如果是元組,引數按欄位順序對應
其他引數與single一致
'''



訂閱輔助函數

允許直接訂閱和處理訊息,都包括對 MQTT v5.0 的支援,但目前不允許在連線或訂閱時設定任何屬性

simple

訂閱一組主題並返回收到的訊息。這是一個阻塞函數

# 導包
from paho.mqtt.subscribe import simple

# 引數
def simple(topics, qos=0, msg_count=1, retained=True, hostname="localhost",
           port=1883, client_id="", keepalive=60, will=None, auth=None,
           tls=None, protocol=paho.MQTTv311, transport="tcp",
           clean_session=True, proxy_args=None):


'''
topics:是使用者端將訂閱的主題字串。如果訂閱多個主題,可以是字串或字串列表
msg_count:訊息計數,從代理檢索的訊息數。預設為 1。如果為 1,將返回單個 MQTTMessage 物件。如果 >1,將返回 MQTTMessages 列表
retained:設定為 True 以保留訊息,設定為 False 以忽略設定了保留標誌的訊息
clean_session:對談設定

其他引數參考single

'''

使用範例

msg = simple("my/test")
rint("%s %s" % (msg.topic, msg.payload))

callback

訂閱一組主題並使用使用者提供的回撥處理收到的訊息。

# 導包
from paho.mqtt.subscribe import callback

# 引數
def callback(callback, topics, qos=0, userdata=None, hostname="localhost",
             port=1883, client_id="", keepalive=60, will=None, auth=None,
             tls=None, protocol=paho.MQTTv311, transport="tcp",
             clean_session=True, proxy_args=None):
  
'''
callback:回撥,定義一個回撥,將用於收到的每條訊息,形式與on_message類似
userdata:使用者資料,將在收到訊息時傳遞給 on_message 回撥
其他引數參考simple
'''

使用範例

# 定義一個回撥
def on_message_print(client, userdata, message)
    print("%s %s" % (message.topic, message.payload))

callback(callback=on_message_print, topics="my/test")