基於Locust實現MQTT協定服務的壓測指令碼

2023-03-24 06:07:09

最近在忙業務的間隙,穿插著做了些效能測試。

一、背景簡介

業務背景大概介紹一下,就是按照國標規定,車輛需要上傳一些指定的資料到ZF的指定平臺,同時車輛也會把資料傳到企業雲端服務上,於是乎就產生了一些效能需求。

目前我們只是先簡單的進行了一個效能場景的測試,就是評估目前服務是否能夠支援,預期的最大同時線上車輛上傳資料。經過評估,線上車輛資料按照預期的10倍來進行的,並且後面增加持續執行12h檢視服務鏈路的穩定性。

本篇並不是一個嚴謹的效能測試過程結果分享,主要是分享下關於mqtt協定服務的壓測指令碼的編寫。因為之前我也沒接觸過MQTT協定的壓測,網上關於相關的壓測指令碼的內容也比較雜亂,所以記錄一下,僅供參考。

捋一下鏈路就知道需要生成哪些資料(因為服務還未上線使用,所以產生的壓測資料後面可以直接清理掉即可。):

  1. 一些前置資料:比如資料庫、快取裡涉及到的車輛資料,通訊祕鑰資料等等,這些可以之前寫指令碼一次性生成即可。
  2. 車輛上報的資料:車輛上報到雲端的資料,是經過一系列加密轉碼,期間還要設計到解密等,這個經過評估,可以簡化其中的某些環境,所以所有的車可以直接傳送相同的資料即可。
  3. 車輛資料:最後就是生成對應的車輛資料,同時線上,按照評估的頻率傳送資料。

其中第1、2的資料在之前針對性的分別生成即可,第3步的車輛傳送資料就是壓測指令碼要乾的事情了。

二、技術選型

這個倒是很快,搜尋引擎大概搜了一下,內容很少,或者說對我有用的內容很少。有看到jmeter有相關外掛的,但是這個方案基本上我都是否決的,一來我不擅長用,而來我覺得用起來肯定會比自己編碼要麻煩的多。

所以就繼續編碼好了,仍然首選python,想到了locust庫,後來看官方檔案的時候,看到locust也針對mqtt協定拓展了一些內容。但是我嘗試下來不太符合我這的需求,也可能當時我用的不對吧,所以就只能自己來從零開始編寫了。

搜尋中又發現Python中用於mqtt協定的庫叫paho.mqtt,支援連線代理,訊息的訂閱、收發等等,於是最後確定使用:locust+paho.mqtt的組合來實現本次的負載指令碼。

三、程式碼編寫

1. 指令碼程式碼

暫時沒做程式碼分層,目前場景簡單,就直接都放一個模組裡了,有點長,先貼上來,後面部分會對指令碼的重點內容進行拆解。

指令碼目前做了這些事情:

  • 從db中查詢有效可用的所有測試車輛資訊資料
  • 根據命令列的輸入引數,指定啟動的車輛數,以及與broker代理建立連線的頻率
  • 建立連線成功的車輛,就可以根據指令碼裡指定的頻次,來像broker傳送資料
  • 指令碼統計連線數、請求數、響應時間等資訊寫到報表中
  • 偵錯遇到車輛會批次斷開連線的情況,增加了當車輛斷開連線時,把斷開時間、車輛資訊寫到本地csv中,方便第二天來檢視分析。
import csv
import datetime
import queue
import os
import sys
import time
import ssl

from paho.mqtt import client as mqtt_client

# 根據不同系統進行路徑適配
if os.name == "nt":
    path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
    sys.path.insert(0, path)
    from GB_test.utils.mysql_operating import DB
elif os.name == "posix":
    sys.path.append("/app/qa_test_app/")
    from GB_test.utils.mysql_operating import DB

from locust import User, TaskSet, events, task, between, run_single_user


BROKER_ADDRESS = "broker服務地址"
PORT = 1111
PASSWORD = "111111"
PUBLISH_TIMEOUT = 10000  # 超時時間
TEST_TOPIC = "test_topic"

TEST_VALUE = [16, 3, -26, 4, 0, 36,.......]  # 用來publish的測試資料,僅示意

BYTES_DATA = bytes(i % 256 for i in TEST_VALUE)  # 業務需要轉換成 byte 型別後再傳送

# 建立佇列
client_queue = queue.Queue()

# 連線DB,讀取車輛資料
db = DB("db_vmd")
select_sql = "select xxxx"  
client_list = db.fetch_all(select_sql)
print("車輛資料查詢完畢,資料量:{}".format(len(client_list)))
for t in client_list:
    # 把可用的車輛資訊存到佇列中去
    client_queue.put(t)


def fire_success(**kwargs):
    """請求成功時呼叫"""
    events.request.fire(**kwargs)


def calculate_resp_time(t1, t2):
    """計算響應時間"""
    return int((t2 - t1) * 1000)


class MQTTMessage:
    """已傳送的訊息實體類"""
    def __init__(self, _type, qos, topic, payload, start_time, timeout):
        self.type = _type,
        self.qos = qos,
        self.topic = topic
        self.payload = payload
        self.start_time = start_time
        self.timeout = timeout


# 統計總共傳送成功的訊息數量
total_published = 0
disconnect_record_list = []  # 定義存放連線斷開的記錄的列表容器


class PublishTask(TaskSet):

    @task
    def task_publish(self):
        self.client.loop_start()
        topic = TEST_TOPIC
        payload = BYTES_DATA
        # 記錄傳送的開始時間
        start_time = time.time()
        mqtt_msg_info = self.client.publish(topic, payload, qos=1, retain=False)
        published_mid = mqtt_msg_info.mid
        # 將傳送成功的訊息內容,放入client範例的 published_message 欄位
        self.client.published_message[published_mid] = MQTTMessage(REQUEST_TYPE,
                                                                   0,
                                                                   topic,
                                                                   payload,
                                                                   start_time,
                                                                   PUBLISH_TIMEOUT)
        # 傳送成功回撥
        self.client.on_publish = self.on_publish
        # 斷開連線回撥
        self.client.on_disconnect = self.on_disconnect

    @staticmethod
    def on_disconnect(client, userdata, rc):
        """ broker連線斷開,放入列表容器"""
        disconnected_info = [str(client._client_id), rc, datetime.datetime.now()]
        disconnect_record_list.append(disconnected_info)
        print("rc狀態:{} - -".format(rc), "{}-broker連線已斷開".format(str(client._client_id)))

    @staticmethod
    def on_publish(client, userdata, mid):
        if mid:
            # 記錄訊息傳送成功的時間
            end_time = time.time()
            # 從已傳送的訊息容器中,取出訊息
            message = client.published_message.pop(mid, None)
            # 計算開始傳送到傳送成功的耗時
            publish_resp_time = calculate_resp_time(message.start_time, end_time)
            fire_success(
                request_type="p_success",
                name="client_id: " + str(client._client_id),
                response_time=publish_resp_time,
                response_length=len(message.payload),
                exception=None,
                context=None
            )
            global total_published
            # 成功傳送累加1
            total_published += 1


class MQTTLocustUser(User):
    tasks = [PublishTask]
    wait_time = between(2, 2)

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # 從佇列中獲取使用者端 username 和 client_id
        current_client = client_queue.get()

        self.client = mqtt_client.Client(current_client[1])
        self.client.username_pw_set(current_client[0], PASSWORD)
        # self.client.username_pw_set(current_client[0] + "1", PASSWORD)  # 模擬client連線報錯

        # 定義一個容器,存放已傳送的訊息
        self.client.published_message = {}

    def on_start(self):
        # 設定tls
        context = ssl.SSLContext(ssl.PROTOCOL_TLS)
        self.client.tls_set_context(context)

        self.client.connect(host=BROKER_ADDRESS, port=PORT, keepalive=60)
        self.client.on_connect = self.on_connect

    def on_stop(self):
        print("publish 成功, 當前已成功傳送數量:{}".format(total_published))
        if len(disconnect_record_list) == 0:
            print("無斷開連線的client")
        else:
            # 把斷開記錄裡的資訊寫入csv
            with open("disconnect_record.csv", "w", newline='', encoding='UTF8') as csvfile:
                writer = csv.writer(csvfile)
                writer.writerow(['client_id', 'rc_status', 'disconnected_time'])
                for i in disconnect_record_list:
                    writer.writerow(i)
            print("斷開連線的client資訊已寫入csv檔案")

    @staticmethod
    def on_connect(client, userdata, flags, rc, props=None):
        if rc == 0:
            print("rc狀態:{} - -".format(rc), "{}-連線broker成功".format(str(client._client_id)))
            fire_success(
                request_type="c_success",
                name='count_connected',
                response_time=0,
                response_length=0,
                exception=None,
                context=None
            )
        else:
            print("rc狀態:{} - -".format(rc), "{}-連線broker失敗".format(str(client._client_id)))
            fire_success(
                request_type="c_fail",
                name="client_id: " + str(client._client_id),
                response_time=0,
                response_length=0,
                exception=None,
                context=None
            )


if __name__ == '__main__':
    run_single_user(MQTTLocustUser)

2. 程式碼分析-locust庫部分

並行請求能力還是使用的locust庫的能力。官方只提供了http協定介面的相關類,沒直接提供mqtt協定的,但是我們可以按照官方的規範,自定義相關的類,只要繼承UserTaskSet即可。

User

首先是先定義User類,這裡就是用來生成我要用來測試的車輛。

類初始化的時候,黃色框裡,會去佇列裡取出車輛資訊,用來做一些相關的設定。client來源於from paho.mqtt import client as mqtt_client提供的能力,固定用法,按照人家的檔案使用就行。

紅色框裡,是User類的2個重要熟悉屬性:

  • tasks: 這裡定義了生成的使用者需要去幹哪些事情,也就是對應指令碼裡的PublishTask類下面定義的內容。
  • wait_time: 使用者在執行task時間隔停留的時間,可以是個區間,在裡面隨機。我這裡意思是每2s傳送一次資料到broker。

綠色框裡,定義了一個字典容器,用來存放當前使用者已傳送成功的訊息內容,因為後面我要取出來把裡面相關的資料寫到生成的報表中去。

藍色框裡有2個方法,也是locust提供的能力:

  • on_start:當用戶開始執行時呼叫,這裡我做了車輛連線broker代理的處理,注意這裡需要設定tls,因為服務連線需要。

  • on_stop:當用戶結束執行時呼叫,這裡我做了一些其他的處理,比如把執行期間斷開連線的車輛資訊寫到本地csv中。

TaskSet

定義好User類,就需要來定義TaskSet類,你得告訴產生出來的使用者,要乾點啥。

我這根據業務需要,就是讓車輛不停的像broker傳送資料即可。

紅色部分,同樣是paho.mqtt提供的能力,會啟動新的執行緒去執行你定義的事情。

黃色部分,就是做傳送資料的操作,並且我可以拿到一些返回,檢視原始碼就可以知道返回的是MQTTMessageInfo類。

注意返回的2個屬性:

  • mid: 返回這個訊息傳送的順序
  • rc: 表示傳送的響應狀態,0 就是成功

綠色部分,還記得我在上面的User類中定義了一個容器,在這裡就把傳送的訊息相關資訊放到容器中去,留著後面使用。

2. 程式碼分析-paho.mqtt庫部分

上面的程式碼已經用到了不少paho.mqtt的能力,這裡再進行整體梳理下。

  • client.Client():宣告一個client
  • client.username_pw_set(): 設定使用者端的使用者名稱,密碼
  • client.tls_set_context: 設定ssl模式
  • client.connect(): 連線代理
  • client.publish:向代理推播訊息

還用到了一些回撥函數:

  • on_connect:連線操作成功時回撥
  • on_publish:釋出成功時回撥
  • on_disconnect:使用者端與代理斷開連線時回撥

另外還用到了一個事件函數events.request

當用戶端傳送請求時會呼叫,不管是請求成功還是請求失敗;當我需要自定義我的報告內容時,就需要用到這個event

檢視原始碼,知道里面要傳哪些引數,那我們在呼叫時候就需要傳入對應的引數。

比如我在傳送回撥函數裡呼叫了該方法。

所以最後在控制檯顯示的報告裡就有我定義的內容了。

由於後來在使用中發現,不知道會在什麼時候出現批次斷開的情況,於是在on_disconnect回撥函數裡增加了對應處理,把相關的斷開資訊記錄下來,執行結束的時候寫到本地檔案裡去。

後來我主動嘗試使用者端斷開的情況測試了下檔案的寫入結果,功能正常。

三、小結

後面就開始執行了,在執行過程中,開發關注鏈路服務的各項指標,這裡就不展開了,業務纏身就並沒有過多的去做這個事情,況且也不專業。確實也發現了不少問題,後面逐步優化,再繼續測試。

現在穩定執行12h,服務正常,暫時就先告一段落了。後面還有會相關其他效能測試場景,屆時就可以針對性的展開分享下了。

另外,這個指令碼分享也只是僅供參考,現在我這是使用簡單,本著能用就行,可能存在一些不合理需要優化的地方,有需要的朋友還請自行查閱相關檔案。