最近在忙業務的間隙,穿插著做了些效能測試。
業務背景大概介紹一下,就是按照國標規定,車輛需要上傳一些指定的資料到ZF的指定平臺,同時車輛也會把資料傳到企業雲端服務上,於是乎就產生了一些效能需求。
目前我們只是先簡單的進行了一個效能場景的測試,就是評估目前服務是否能夠支援,預期的最大同時線上車輛上傳資料。經過評估,線上車輛資料按照預期的10倍來進行的,並且後面增加持續執行12h檢視服務鏈路的穩定性。
本篇並不是一個嚴謹的效能測試過程結果分享,主要是分享下關於mqtt協定服務的壓測指令碼的編寫。因為之前我也沒接觸過MQTT協定的壓測,網上關於相關的壓測指令碼的內容也比較雜亂,所以記錄一下,僅供參考。
捋一下鏈路就知道需要生成哪些資料(因為服務還未上線使用,所以產生的壓測資料後面可以直接清理掉即可。):
其中第1、2的資料在之前針對性的分別生成即可,第3步的車輛傳送資料就是壓測指令碼要乾的事情了。
這個倒是很快,搜尋引擎大概搜了一下,內容很少,或者說對我有用的內容很少。有看到jmeter有相關外掛的,但是這個方案基本上我都是否決的,一來我不擅長用,而來我覺得用起來肯定會比自己編碼要麻煩的多。
所以就繼續編碼好了,仍然首選python,想到了locust
庫,後來看官方檔案的時候,看到locust
也針對mqtt
協定拓展了一些內容。但是我嘗試下來不太符合我這的需求,也可能當時我用的不對吧,所以就只能自己來從零開始編寫了。
搜尋中又發現Python
中用於mqtt
協定的庫叫paho.mqtt
,支援連線代理,訊息的訂閱、收發等等,於是最後確定使用:locust
+paho.mqtt
的組合來實現本次的負載指令碼。
暫時沒做程式碼分層,目前場景簡單,就直接都放一個模組裡了,有點長,先貼上來,後面部分會對指令碼的重點內容進行拆解。
指令碼目前做了這些事情:
import csv
import datetime
import queue
import os
import sys
import time
import ssl
from paho.mqtt import client as mqtt_client
# 根據不同系統進行路徑適配
if os.name == "nt":
path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
sys.path.insert(0, path)
from GB_test.utils.mysql_operating import DB
elif os.name == "posix":
sys.path.append("/app/qa_test_app/")
from GB_test.utils.mysql_operating import DB
from locust import User, TaskSet, events, task, between, run_single_user
BROKER_ADDRESS = "broker服務地址"
PORT = 1111
PASSWORD = "111111"
PUBLISH_TIMEOUT = 10000 # 超時時間
TEST_TOPIC = "test_topic"
TEST_VALUE = [16, 3, -26, 4, 0, 36,.......] # 用來publish的測試資料,僅示意
BYTES_DATA = bytes(i % 256 for i in TEST_VALUE) # 業務需要轉換成 byte 型別後再傳送
# 建立佇列
client_queue = queue.Queue()
# 連線DB,讀取車輛資料
db = DB("db_vmd")
select_sql = "select xxxx"
client_list = db.fetch_all(select_sql)
print("車輛資料查詢完畢,資料量:{}".format(len(client_list)))
for t in client_list:
# 把可用的車輛資訊存到佇列中去
client_queue.put(t)
def fire_success(**kwargs):
"""請求成功時呼叫"""
events.request.fire(**kwargs)
def calculate_resp_time(t1, t2):
"""計算響應時間"""
return int((t2 - t1) * 1000)
class MQTTMessage:
"""已傳送的訊息實體類"""
def __init__(self, _type, qos, topic, payload, start_time, timeout):
self.type = _type,
self.qos = qos,
self.topic = topic
self.payload = payload
self.start_time = start_time
self.timeout = timeout
# 統計總共傳送成功的訊息數量
total_published = 0
disconnect_record_list = [] # 定義存放連線斷開的記錄的列表容器
class PublishTask(TaskSet):
@task
def task_publish(self):
self.client.loop_start()
topic = TEST_TOPIC
payload = BYTES_DATA
# 記錄傳送的開始時間
start_time = time.time()
mqtt_msg_info = self.client.publish(topic, payload, qos=1, retain=False)
published_mid = mqtt_msg_info.mid
# 將傳送成功的訊息內容,放入client範例的 published_message 欄位
self.client.published_message[published_mid] = MQTTMessage(REQUEST_TYPE,
0,
topic,
payload,
start_time,
PUBLISH_TIMEOUT)
# 傳送成功回撥
self.client.on_publish = self.on_publish
# 斷開連線回撥
self.client.on_disconnect = self.on_disconnect
@staticmethod
def on_disconnect(client, userdata, rc):
""" broker連線斷開,放入列表容器"""
disconnected_info = [str(client._client_id), rc, datetime.datetime.now()]
disconnect_record_list.append(disconnected_info)
print("rc狀態:{} - -".format(rc), "{}-broker連線已斷開".format(str(client._client_id)))
@staticmethod
def on_publish(client, userdata, mid):
if mid:
# 記錄訊息傳送成功的時間
end_time = time.time()
# 從已傳送的訊息容器中,取出訊息
message = client.published_message.pop(mid, None)
# 計算開始傳送到傳送成功的耗時
publish_resp_time = calculate_resp_time(message.start_time, end_time)
fire_success(
request_type="p_success",
name="client_id: " + str(client._client_id),
response_time=publish_resp_time,
response_length=len(message.payload),
exception=None,
context=None
)
global total_published
# 成功傳送累加1
total_published += 1
class MQTTLocustUser(User):
tasks = [PublishTask]
wait_time = between(2, 2)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 從佇列中獲取使用者端 username 和 client_id
current_client = client_queue.get()
self.client = mqtt_client.Client(current_client[1])
self.client.username_pw_set(current_client[0], PASSWORD)
# self.client.username_pw_set(current_client[0] + "1", PASSWORD) # 模擬client連線報錯
# 定義一個容器,存放已傳送的訊息
self.client.published_message = {}
def on_start(self):
# 設定tls
context = ssl.SSLContext(ssl.PROTOCOL_TLS)
self.client.tls_set_context(context)
self.client.connect(host=BROKER_ADDRESS, port=PORT, keepalive=60)
self.client.on_connect = self.on_connect
def on_stop(self):
print("publish 成功, 當前已成功傳送數量:{}".format(total_published))
if len(disconnect_record_list) == 0:
print("無斷開連線的client")
else:
# 把斷開記錄裡的資訊寫入csv
with open("disconnect_record.csv", "w", newline='', encoding='UTF8') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['client_id', 'rc_status', 'disconnected_time'])
for i in disconnect_record_list:
writer.writerow(i)
print("斷開連線的client資訊已寫入csv檔案")
@staticmethod
def on_connect(client, userdata, flags, rc, props=None):
if rc == 0:
print("rc狀態:{} - -".format(rc), "{}-連線broker成功".format(str(client._client_id)))
fire_success(
request_type="c_success",
name='count_connected',
response_time=0,
response_length=0,
exception=None,
context=None
)
else:
print("rc狀態:{} - -".format(rc), "{}-連線broker失敗".format(str(client._client_id)))
fire_success(
request_type="c_fail",
name="client_id: " + str(client._client_id),
response_time=0,
response_length=0,
exception=None,
context=None
)
if __name__ == '__main__':
run_single_user(MQTTLocustUser)
並行請求能力還是使用的locust
庫的能力。官方只提供了http
協定介面的相關類,沒直接提供mqtt
協定的,但是我們可以按照官方的規範,自定義相關的類,只要繼承User
和TaskSet
即可。
User類
首先是先定義User
類,這裡就是用來生成我要用來測試的車輛。
類初始化的時候,黃色框裡,會去佇列裡取出車輛資訊,用來做一些相關的設定。client
來源於from paho.mqtt import client as mqtt_client
提供的能力,固定用法,按照人家的檔案使用就行。
紅色框裡,是User
類的2個重要熟悉屬性:
tasks
: 這裡定義了生成的使用者需要去幹哪些事情,也就是對應指令碼裡的PublishTask
類下面定義的內容。wait_time
: 使用者在執行task時間隔停留的時間,可以是個區間,在裡面隨機。我這裡意思是每2s傳送一次資料到broker。綠色框裡,定義了一個字典容器,用來存放當前使用者已傳送成功的訊息內容,因為後面我要取出來把裡面相關的資料寫到生成的報表中去。
藍色框裡有2個方法,也是locust
提供的能力:
on_start
:當用戶開始執行時呼叫,這裡我做了車輛連線broker代理的處理,注意這裡需要設定tls,因為服務連線需要。
on_stop
:當用戶結束執行時呼叫,這裡我做了一些其他的處理,比如把執行期間斷開連線的車輛資訊寫到本地csv中。
TaskSet類
定義好User
類,就需要來定義TaskSet
類,你得告訴產生出來的使用者,要乾點啥。
我這根據業務需要,就是讓車輛不停的像broker傳送資料即可。
紅色部分,同樣是paho.mqtt
提供的能力,會啟動新的執行緒去執行你定義的事情。
黃色部分,就是做傳送資料的操作,並且我可以拿到一些返回,檢視原始碼就可以知道返回的是MQTTMessageInfo
類。
注意返回的2個屬性:
mid
: 返回這個訊息傳送的順序rc
: 表示傳送的響應狀態,0 就是成功綠色部分,還記得我在上面的User
類中定義了一個容器,在這裡就把傳送的訊息相關資訊放到容器中去,留著後面使用。
上面的程式碼已經用到了不少paho.mqtt
的能力,這裡再進行整體梳理下。
還用到了一些回撥函數:
另外還用到了一個事件函數events.request
。
當用戶端傳送請求時會呼叫,不管是請求成功還是請求失敗;當我需要自定義我的報告內容時,就需要用到這個event
。
檢視原始碼,知道里面要傳哪些引數,那我們在呼叫時候就需要傳入對應的引數。
比如我在傳送回撥函數裡呼叫了該方法。
所以最後在控制檯顯示的報告裡就有我定義的內容了。
由於後來在使用中發現,不知道會在什麼時候出現批次斷開的情況,於是在on_disconnect
回撥函數裡增加了對應處理,把相關的斷開資訊記錄下來,執行結束的時候寫到本地檔案裡去。
後來我主動嘗試使用者端斷開的情況測試了下檔案的寫入結果,功能正常。
後面就開始執行了,在執行過程中,開發關注鏈路服務的各項指標,這裡就不展開了,業務纏身就並沒有過多的去做這個事情,況且也不專業。確實也發現了不少問題,後面逐步優化,再繼續測試。
現在穩定執行12h,服務正常,暫時就先告一段落了。後面還有會相關其他效能測試場景,屆時就可以針對性的展開分享下了。
另外,這個指令碼分享也只是僅供參考,現在我這是使用簡單,本著能用就行,可能存在一些不合理需要優化的地方,有需要的朋友還請自行查閱相關檔案。