當前的應用都使用了前後端分離的架構,前後端系統需要協同以實現各種功能。後端系統通常負責處理業務邏輯、資料儲存和與其他服務的互動,而前端則負責使用者介面和使用者互動。而在前後端資料互動的過程中,各種異常和錯誤都有可能發生,確認異常發生時前後端系統的處理是否合理是測試驗證中非常重要的一環。
在上一篇部落格【Python】從同步到非同步多核:測試樁效能優化,加速應用的開發和驗證
中我們介紹瞭如何使用測試樁來隔離對環境的依賴,這次我們一起看看如何使用異常注入來應對聯調中的異常場景。
在系統異常中,資料庫連線失敗、第三方服務不可用等都是比較典型的場景。常見的驗證手段往往是前端的同學告知後臺同學開啟網路隔離,然後再進行驗證。
iptables
丟棄某ip封包具體操作:
$ iptables -L
Chain INPUT (policy ACCEPT)
target prot opt source destination
Chain FORWARD (policy ACCEPT)
target prot opt source destination
Chain OUTPUT (policy ACCEPT)
target prot opt source destination
# 檢查發現能 是否能正常 ping通
$ ping {資料庫/後端地址IP}
PING 1.1.1.1 (1.1.1.1) 56(84) bytes of data.
64 bytes from 1.1.1.*: icmp_seq=1 ttl=61 time=0.704 ms
64 bytes from 1.1.1.*: icmp_seq=2 ttl=61 time=0.802 ms
^C
--- 1.1.1.1 ping statistics ---
2 packets transmitted, 2 received, 0% packet loss, time 1001ms
rtt min/avg/max/mdev = 0.704/0.753/0.802/0.049 ms
$ iptables -I INPUT -p all -s {資料庫/後端地址IP} -j DROP
$ iptables -L
Chain INPUT (policy ACCEPT)
target prot opt source destination
DROP all -- 1.1.1.* anywhere
Chain FORWARD (policy ACCEPT)
target prot opt source destination
Chain OUTPUT (policy ACCEPT)
target prot opt source destination
$ ping 1.1.1.*
PING 1.1.1.1 (1.1.1.*) 56(84) bytes of data.
^C
--- 1.1.1.1 ping statistics ---
85 packets transmitted, 0 received, 100% packet loss, time 84312ms
$ iptables -F
iptables -I INPUT -s {後端IP} -m statistic --mode random --probability 0.5 -j DROP
上面這種方式能實現相關的聯調,但是有兩點可以改進的地方:
那麼有沒有侵入更小,更優雅的異常場景驗證方式呢?答案是肯定的。
更優雅的方式:寫一個mysql proxy
代理,讓後端svr 直接連線這個proxy,proxy再連線真實的mysql。
timeout
)時,直接斷開TCP連線,模擬連線DB超時場景import asyncio
# 真實mysqlIP埠
mysql_host = settings.MYSQL_HOST
mysql_port = settings.MYSQL_PORT
# 處理使用者端連線
async def handle_connection(client_reader, client_writer):
# 連線到實際的 MySQL 伺服器
mysql_reader, mysql_writer = await asyncio.open_connection(mysql_host, mysql_port)
# 轉發握手包
handshake_packet = await mysql_reader.read(4096)
client_writer.write(handshake_packet)
await client_writer.drain()
# 處理使用者端認證請求
auth_packet = await client_reader.read(4096)
mysql_writer.write(auth_packet)
await mysql_writer.drain()
# 轉發認證響應
auth_response = await mysql_reader.read(4096)
client_writer.write(auth_response)
await client_writer.drain()
# 轉發請求和響應
while True:
data = await client_reader.read(4096)
if not data:
break
sql_query = data[5:].decode('utf-8')
if "timeout" in sql_query: # sql 包含timeout 關鍵字時,直接關閉連線
await asyncio.sleep(1)
break
else:
mysql_writer.write(data)
await mysql_writer.drain()
response = await mysql_reader.read(4096)
client_writer.write(response)
await client_writer.drain()
# 關閉連線
client_writer.close()
mysql_writer.close()
async def main(host, port):
server = await asyncio.start_server(handle_connection, host, port)
async with server:
await server.serve_forever()
# 使用範例
if __name__ == "__main__":
# 本地監聽 3307, svr 連線到3307
host = "0.0.0.0"
port = 3307
asyncio.run(main(host, port))
下面的程式碼會在執行到第二條select 語句時超時:
import pymysql
connection = pymysql.connect(
host="192.168.31.76",
port=8899,
# 真實mysql 賬戶資訊
user="{user}",
password="{password}",
database="mydb",
)
curs = connection.cursor()
curs.execute("select * from user where name='bingo';")
print(curs.fetchall())
curs.execute("insert into user set name='bingtime';")
connection.commit()
curs.execute("select * from user where name='timeoutbingo';")
print(curs.fetchall())
curs.close()
connection.close()
低版本的Python沒有asyncio.run
和server.serve_forever()
需要修改main函數.
# 主函數,啟動伺服器並監聽連線
async def main(host, port):
server = await asyncio.start_server(handle_connection, host, port)
try:
# Python 3.6.5 中沒有 server.serve_forever() 方法,所以需要使用一個無限迴圈來保持伺服器執行。
# 我們使用 asyncio.sleep() 來避免阻塞事件迴圈,使其可以處理其他任務,如新連線。
while True:
await asyncio.sleep(3600) # 1 hour
except KeyboardInterrupt:
pass
finally:
server.close()
await server.wait_closed()
# 使用範例
if __name__ == "__main__":
host = "0.0.0.0"
port = 3307
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main(host, port))
except KeyboardInterrupt:
pass
finally:
loop.close()
寫一個proxy,監聽來往的SQL 語句:
import pymysql
import socket
import threading
# 設定
SERVER_ADDRESS = (settings.MYSQL_HOST, settings.MYSQL_PORT) # 真實MySQL 伺服器地址
PROXY_ADDRESS = ('0.0.0.0', 8899) # 監聽代理伺服器地址
def print_query(data):
try:
command = data[4]
if command == pymysql.constants.COMMAND.COM_QUERY:
query = data[5:].decode("utf-8")
print(f"SQL: {query}")
except Exception as e:
print(f"Error parsing packet: {e}")
def forward_data(src_socket, dst_socket, parser=None):
while True:
try:
data = src_socket.recv(4096)
if not data:
break
if parser:
parser(data)
dst_socket.sendall(data)
except OSError as e:
if e.errno == 9:
break
else:
raise
def main():
# 建立代理伺服器通訊端
proxy_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
proxy_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
proxy_socket.bind(PROXY_ADDRESS)
proxy_socket.listen(5)
print(f"Proxy server listening on {PROXY_ADDRESS}")
while True:
client_socket, client_address = proxy_socket.accept()
print(f"Client {client_address} connected")
# 連線到 MySQL 伺服器
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.connect(SERVER_ADDRESS)
try:
# 建立執行緒轉發使用者端資料到伺服器
client_to_server = threading.Thread(target=forward_data, args=(client_socket, server_socket, print_query))
client_to_server.start()
# 建立執行緒轉發伺服器資料到使用者端
server_to_client = threading.Thread(target=forward_data, args=(server_socket, client_socket))
server_to_client.start()
# 等待執行緒完成
client_to_server.join()
server_to_client.join()
finally:
client_socket.close()
server_socket.close()
print(f"Client {client_address} disconnected")
if __name__ == "__main__":
main()
如果使用上面的pymysql測試程式碼執行,會列印如下的資訊:
Client ('127.0.0.1', 57184) connected
SQL: SET AUTOCOMMIT = 0
SQL: select * from user where name='bingo';
SQL: insert into user set name='bing21211';
SQL: COMMIT
SQL: select * from user where name='bingo';
SQL: select * from user where name='bingo';
SQL: select * from user where name='timeoutbingo';
Client ('127.0.0.1', 57184) disconnected
通過實現合適的例外處理機制,可以確保使用者在遇到問題時獲得有用的反饋,驗證這些處理機制能提高系統的穩定性和安全性。iptables
功能強大但是需要手動操作,mysql proxy
代理功能直接,但是應用場景較為有限,大家可以根據實際情況進行選擇。
合抱之木,生於毫末;九層之臺,起於累土;千里之行,始於足下。