modbus_tcp 協定是工業專案中常見的一種基於 TCP/IP 協定的裝置資料互動協定。
作為 TCP/IP 協定的上層協定,modbus_tcp 協定涉及到兩個概念:client 和 server。但更標準的叫法應該是 master 和 slave。
而 modbus_tk 庫作為 Python 中著名的 modbus 協定封裝模組,其原始碼值得深入研究。
特別是在對並行量等方面有一定要求的情況下,如果需要在 modbus_tk 模組的基礎上進行更進一步的開發,就更應該仔細研究其原始碼和實現邏輯。
因此,我寫下了這篇文章,希望對你有所幫助。
匯入 TcpMaster 類:
from modbus_tk.modbus_tcp import TcpMaster
TcpMaster 繼承於 Master,在其範例化的時候什麼也沒做。
class TcpMaster(Master):
def __init__(self, host="127.0.0.1", port=502, timeout_in_sec=5.0):
super(TcpMaster, self).__init__(timeout_in_sec)
self._host = host
self._port = port
self._sock = None
Master 的 __init__() 方法中也沒有做什麼:
class Master(object):
def __init__(self, timeout_in_sec, hooks=None):
self._timeout = timeout_in_sec
self._verbose = False
self._is_opened = False # 記住 _is_opened 現在為 False
TcpMaster 的父類別 Master 提供了 execute 方法,該方法提供以下引數:
self,
slave,
function_code,
starting_address,
quantity_of_x=0,
output_value=0,
data_format="",
expected_length=-1,
write_starting_address_fc23=0,
number_file=None,
pdu="",
returns_raw=False
此方法基本上算該模組的核心,無論是讀寫線圈、還是讀寫暫存器等都是呼叫該方法。
接下來其程式碼體的具體實現,我們將開始進行逐行分析:
is_read_function = False
nb_of_digits = 0
if number_file is None:
number_file = tuple()
self.open()
is_read_function 這裡賦值為 False、代表後續在 Master.execute() 方法真正執行前,作者會先認為使用者呼叫的是 write 方法而非 read 方法。
接下來程式碼中又呼叫了 self.open() 方法。 由於範例化 TcpMaster 類時什麼也沒做, 所以 TCP 連結在此時是還沒有建立的,而 self.open() 方法就是建立一個 TCP 的 client 端。
def open(self):
if not self._is_opened: # 在初始化方法中,它預設是 False
self._do_open()
self._is_opened = True
這裡執行的 self._do_open() 方法由 TcpMaster 實現:
def _do_open(self):
if self._sock: # 如果 self._sock 不是 None、就將 socket 物件關閉
self._sock.close()
# 建立一個 socket 物件,AF_INET 為 IPV4 地址家族
# SOCK_STREAM 即為基於流的協定,也就是 TCP 協定
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 設定超時時間,即範例化 TcpMaster 傳入的值,預設引數為 5
self.set_timeout(self.get_timeout())
# 允許重用地址(解決埠佔用問題)
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
call_hooks("modbus_tcp.TcpMaster.before_connect", (self, ))
# 進行連結
self._sock.connect((self._host, self._port))
call_hooks("modbus_tcp.TcpMaster.after_connect", (self, ))
這裡的 self.set_timeout 由 TcpMaster 實現:
def set_timeout(self, timeout_in_sec):
super(TcpMaster, self).set_timeout(timeout_in_sec)
if self._sock:
# 注意! 這裡如果 timeout_in_sec 等於 0
# 那麼該 sock 物件就是一個連結時非阻塞的
# 可用於 I/O 多路複用
self._sock.setblocking(timeout_in_sec > 0)
# 如果 timeout_in_sec 為 0,則設定為阻塞的 socket 物件
# timeout 不應該傳遞負數
if timeout_in_sec:
self._sock.settimeout(timeout_in_sec)
看到這裡,我們其實不難猜出 modbus_tk 模組中 TcpMaster 的 Master.execute() 方法其實是能支援 self._sock 異常後的無感重聯的。
只需要在 slave 方失聯後重新呼叫一次 TcpMaster._do_open() 方法即可,即可實現無感知的重新連結。
接下來 Master.execute() 方法基本是對 TCP 協定的解包、組包程式碼,我將具體的組包等過程程式碼都先給註釋掉了:
@threadsafe_function
def execute(
self, slave, function_code, starting_address, quantity_of_x=0, output_value=0, data_format="",
expected_length=-1, write_starting_address_fc23=0, number_file=None, pdu="", returns_raw=False
):
is_read_function = False
nb_of_digits = 0
if number_file is None:
number_file = tuple()
self.open()
if function_code == defines.READ_COILS or function_code == defines.READ_DISCRETE_INPUTS:
pass
elif function_code == defines.READ_INPUT_REGISTERS or function_code == defines.READ_HOLDING_REGISTERS:
pass
elif function_code == defines.READ_FILE_RECORD:
pass
elif (function_code == defines.WRITE_SINGLE_COIL) or (function_code == defines.WRITE_SINGLE_REGISTER):
pass
elif function_code == defines.WRITE_MULTIPLE_COILS:
pass
elif function_code == defines.WRITE_MULTIPLE_REGISTERS:
pass
elif function_code == defines.READ_EXCEPTION_STATUS:
pass
elif function_code == defines.DIAGNOSTIC:
pass
elif function_code == defines.READ_WRITE_MULTIPLE_REGISTERS:
pass
elif function_code == defines.RAW:
pass
elif function_code == defines.DEVICE_INFO:
pass
else:
raise ModbusFunctionNotSupportedError("The {0} function code is not supported. ".format(function_code))
query = self._make_query()
request = query.build_request(pdu, slave)
retval = call_hooks("modbus.Master.before_send", (self, request))
if retval is not None:
request = retval
if self._verbose:
LOGGER.debug(get_log_buffer("-> ", request))
self._send(request)
call_hooks("modbus.Master.after_send", (self, ))
if slave != 0:
pass
為了能夠繼續向下分析,我們這裡先以寫入多個暫存器的邏輯入手接著向下看:
READ_WRITE_MULTIPLE_REGISTERS
其程式碼為:
elif function_code == defines.WRITE_MULTIPLE_REGISTERS:
# 輸出值和 format,如果指定了 format 和輸出值,將執行下面的邏輯
if output_value and data_format:
byte_count = struct.calcsize(data_format)
# 否則先計算整個 byte 的長度
else:
byte_count = 2 * len(output_value)
# 使用 struct 對資料進行轉換, 採用大端排列
pdu = struct.pack(">BHHB", function_code, starting_address, byte_count // 2, byte_count)
# 輸出值和 format,如果指定了 format 和輸出值,將執行下面的邏輯
if output_value and data_format:
pdu += struct.pack(data_format, *output_value)
# 一般我們不會指定 data_format,所以直接往下看
else:
for j in output_value:
# 若 j 大於 0 fmt 就是 H 否則是 h
fmt = "H" if j >= 0 else "h"
# 繼續向 pdu 里加資料
pdu += struct.pack(">" + fmt, j)
data_format = ">HH"
if expected_length < 0:
expected_length = 8
無論是讀取、寫入線圈或者暫存器,每一個請求都會包含一個 pdu 資料單元。
在 Master.execute() 方法下面,每一種讀寫操作都會執行 TcpMaster._make_query() 方法:
query = self._make_query()
request = query.build_request(pdu, slave)
下面是 TcpMaster._make_query 的程式碼:
def _make_query(self):
return TcpQuery()
TcpQuery 屬於 Query 的派生類,但 Query 實際上是一個 interface 類,故沒有實際程式碼:
class TcpQuery(Query):
_last_transaction_id = 0 # 記住這個類屬性
def __init__(self):
super(TcpQuery, self).__init__()
self._request_mbap = TcpMbap()
self._response_mbap = TcpMbap()
TcpMbap 類的範例化過程也非常簡單,TcpQuery 中實際上就是封裝了一個 request 和 response 而已:
class TcpMbap(object):
def __init__(self):
self.transaction_id = 0
self.protocol_id = 0
self.length = 0
self.unit_id = 0
TcpQuery.build_request() 的實現:
def build_request(self, pdu, slave):
if (slave < 0) or (slave > 255):
raise InvalidArgumentError("{0} Invalid value for slave id".format(slave))
self._request_mbap.length = len(pdu) + 1 # pdu 資料單元的長度 + 1
self._request_mbap.transaction_id = self._get_transaction_id() # 獲取一個事務 id
self._request_mbap.unit_id = slave # 站號
mbap = self._request_mbap.pack() # 打包
# mbap 和 pdu 資料單元拼接並返回
# mbap 可以認為是 head 而 pdu 則是 body
return mbap + pdu
TcpQuery._get_transaction_id() 會在每次收發包時,都讓事務號自增 1,當事務號增加到 65535 後,置 0:
@threadsafe_function
def _get_transaction_id(self):
if TcpQuery._last_transaction_id < 0xffff: # 65535
TcpQuery._last_transaction_id += 1
else:
TcpQuery._last_transaction_id = 0
return TcpQuery._last_transaction_id
TcpMbap.pack() 方法會將所有 TcpMbap.__init__() 中的範例屬性通過 struct 進行封包:
def pack(self):
# transaction_id 事務號
# protocol_id 0
# length pdu 資料單元的長度 + 1
# unit_id 裝置站號 slave
return struct.pack(">HHHB", self.transaction_id, self.protocol_id, self.length, self.unit_id)
至此,request 請求已經構建完畢了。
讓我們接著回到 Master.execute() 方法中:
# call_hooks 實際上是執行勾點函數,在後面會有詳細介紹
retval = call_hooks("modbus.Master.before_send", (self, request))
if retval is not None:
request = retval
# 是否需要列印更多的紀錄檔?這個可以通過 Master.set_verbose() 方法進行設定
# 其預設值為 False
if self._verbose:
LOGGER.debug(get_log_buffer("-> ", request))
# 傳送請求
self._send(request)
call_hooks("modbus.Master.after_send", (self, ))
在 TcpMaster._send() 方法中:
def _send(self, request):
retval = call_hooks("modbus_tcp.TcpMaster.before_send", (self, request))
if retval is not None:
request = retval
try:
# 重新整理 socket 確保連結可用
flush_socket(self._sock, 3)
except Exception as msg:
LOGGER.error('Error while flushing the socket: {0}'.format(msg))
# 異常後、將再次執行 TcpMaster._do_open() 嘗試重聯
self._do_open()
# 若 flush_socket() 函數執行沒有丟擲異常,則代表連結是可用的。
# 這時候才會傳送資料
self._sock.send(request)
flush_socket() 函數非常有趣,它通過 select 模組來不斷的輪詢監聽 sock 物件的可讀狀態,當可讀時會自動讀取每一次的 1024 個位元組資料並將他們拋棄,這裡是為了保持傳送資料前的連線狀態檢測沒有異常而做的一步操作:
def flush_socket(socks, lim=0):
# lim 傳入的是 3, 代表最多讀 3 次
input_socks = [socks] # 做成一個監聽列表
cnt = 0 # 當前讀取到的次數
while True:
# 放入 可讀事件列表、可寫事件列表、錯誤事件列表 及監聽物件
# 它會返回一個列表:
# [[r_fd, r_fd], [w_fd, w_fd], [e_fd, e_fd]]
# 而 [0] 則是指只拿到可讀的檔案描述符列表
# 迴圈事件時間設定的是 0.0 這代表它將一直阻塞在這裡,直到 fd 事件被觸發
# 若不為 0,則等待 n 秒,進行下一次的迴圈
i_socks = select.select(input_socks, input_socks, input_socks, 0.0)[0]
# 沒有可讀的檔案描述符,則跳出 while 迴圈
if len(i_socks) == 0:
break
# 若拿到了,就回圈得到 socks 進行 recv
# 其實這裡應該也可以寫成 i_socks[0].recv(1024)
# 因為可讀事件檔案描述符
for sock in i_socks:
sock.recv(1024)
# 超出了最大讀取限制, 這裡應該代表的是連線斷開了
if lim > 0:
cnt += 1
if cnt >= lim:
raise Exception("flush_socket: maximum number of iterations reached")
至此、我們一次完整的組包及傳送資料的原始碼分析就走完了。
我們接著來看 Master.execute() 方法中關於解析響應資訊的程式碼:
if slave != 0:
response = self._recv(expected_length)
pass
首先,如果站號不等於 0 就會執行 TcpMaster._recv() 方法:
def _recv(self, expected_length=-1):
# to_data 函數會根據 Python 版本來返回不同的內容
# 若是 Python2 則直接返回 string ''
# 若是 Python3 則會返回一個 bytearray('', 'ascii')
response = to_data('')
length = 255
# 如果 response 小於 255, 則不斷的讀取
while len(response) < length:
rcv_byte = self._sock.recv(1)
if rcv_byte:
response += rcv_byte
# 在第 6 個位元組處、通過 struct.unpack() 進行拆包
if len(response) == 6:
to_be_recv_length = struct.unpack(">HHH", response)[2]
length = to_be_recv_length + 6
else:
break
retval = call_hooks("modbus_tcp.TcpMaster.after_recv", (self, response))
if retval is not None:
return retval
return response
得到 response 後,Master.execute() 方法會開始解析響應資訊:
retval = call_hooks("modbus.Master.after_recv", (self, response))
if retval is not None:
response = retval
if self._verbose:
LOGGER.debug(get_log_buffer("<- ", response))
response_pdu = query.parse_response(response)
TcpQuery.parse_response() 方法的程式碼主要將 mbap 和 pdu 進行分離,並且通過 TcpMbap.unpack() 方法將 mbap 解包並通過 TcpMbap.check_response() 進行資料校驗:
def parse_response(self, response):
if len(response) > 6:
# 分別拿到 mbap 和 pdu
mbap, pdu = response[:7], response[7:]
# 解包
self._response_mbap.unpack(mbap)
# 校驗資料,傳入請求的 mbap 以及 pdu 的長度
self._response_mbap.check_response(self._request_mbap, len(pdu))
# 返回 pdu
return pdu
else:
raise ModbusInvalidResponseError("Response length is only {0} bytes. ".format(len(response)))
TcpMbap.unpack() 方法程式碼如下,將 _response_mbap 的事務號協定 id 等資訊進行更新:
def unpack(self, value):
(self.transaction_id, self.protocol_id, self.length, self.unit_id) = struct.unpack(">HHHB", value)
TcpMbap.check_response() 方法程式碼如下:
def check_response(self, request_mbap, response_pdu_length):
error_str = self._check_ids(request_mbap)
error_str += self.check_length(response_pdu_length)
if len(error_str) > 0:
raise ModbusInvalidMbapError(error_str)
TcpMbap._check_ids() 方法程式碼如下:
def _check_ids(self, request_mbap):
# self 是響應體, request_mbap 是請求體
# 對比他們的事務號等資訊是否一致,若不一致則會在返回一個 error_str, 該 error_str 會在 TcpMbap.check_response()
# 中被 raise
error_str = ""
if request_mbap.transaction_id != self.transaction_id:
error_str += "Invalid transaction id: request={0} - response={1}. ".format(
request_mbap.transaction_id, self.transaction_id)
if request_mbap.protocol_id != self.protocol_id:
error_str += "Invalid protocol id: request={0} - response={1}. ".format(
request_mbap.protocol_id, self.protocol_id
)
if request_mbap.unit_id != self.unit_id:
error_str += "Invalid unit id: request={0} - response={1}. ".format(request_mbap.unit_id, self.unit_id)
return error_str
TcpMbap.check_length() 方法程式碼如下:
def check_length(self, pdu_length):
# 這裡思考 pdu 長度為什麼 + 1?
# 因為 response 在 TcpMbap.unpack() 方法中,self.length 是 mbap + pdu 的長度
# 所以這裡 pdu_length 長度 + 1 實際上就是指整個 head + body 的長度
following_bytes_length = pdu_length+1
# 判斷長度是否相等、若不等可能造成的原因是資料拆包不正確 mbap 長了,或者 pdu 短了
# 這種時候就直接返回一個字串
# TcpMbap.check_response() 中如果 error_str 的長度大於 0, 就會丟擲異常了
if self.length != following_bytes_length:
return "Response length is {0} while receiving {1} bytes. ".format(self.length, following_bytes_length)
return ""
至此,TcpQuery().parse_response() 方法就全部執行完畢了。
Master.execute() 方法中就得到了資料單元 pdu。也就是整個資料體。
我們接著往下看 Master.execute() 方法,其實後面已經沒有再深層次呼叫某些內部程式碼了,也沒有新的 I/O 操作了:
response_pdu = query.parse_response(response)
(return_code, byte_2) = struct.unpack(">BB", response_pdu[0:2])
# 如果返回的 code 大於 128,直接報錯
if return_code > 0x80:
# the slave has returned an error
exception_code = byte_2
raise ModbusError(exception_code)
else:
# 下面都是解析出一個 body 和一個 data_format
# 分別是 讀操作、裝置資訊、寫操作
# 他們所得到的 body 都不一樣
if is_read_function:
byte_count = byte_2
data = response_pdu[2:]
if byte_count != len(data):
# the byte count in the pdu is invalid
raise ModbusInvalidResponseError(
"Byte count is {0} while actual number of bytes is {1}. ".format(byte_count, len(data))
)
elif function_code == defines.DEVICE_INFO:
data = response_pdu[1:]
data_format = ">" + (len(data) * "B")
else:
# returns what is returned by the slave after a writing function
data = response_pdu[1:]
# 預設為 False
if returns_raw:
return data
# 解包,通過 讀、寫、裝置資訊所得到的 data_format 和 data
# 對資料進行操作
result = struct.unpack(data_format, data)
# 只有 function_code 是 READ_COILS 時,nb_of_digits 才不為 0
if nb_of_digits > 0:
digits = []
for byte_val in result:
for i in range(8):
if len(digits) >= nb_of_digits:
break
digits.append(byte_val % 2)
byte_val = byte_val >> 1
result = tuple(digits)
# 如果 function_code 是 READ_FILE_RECORD 讀取檔案記錄,則也需要對 result 進行
# 再次的修改
if function_code == defines.READ_FILE_RECORD:
sub_seq = list()
ptr = 0
while ptr < len(result):
sub_seq += ((ptr + 2, ptr + 2 + result[ptr] // 2), )
ptr += result[ptr] // 2 + 2
result = tuple(map(lambda sub_seq_x: result[sub_seq_x[0]:sub_seq_x[1]], sub_seq))
# 返回 result
return result
threadsafe 是一個裝飾器函數,在 Master.execute() 方法頭上和 TcpQuery._get_transaction_id() 方法頭上都加了這個裝飾器。
見名知意,該裝飾器的主要目的就是為了保障執行緒安全(有的裝置可能不支援同時對其進行讀寫操作),但是該裝飾器也可能會帶來另一些問題。
我們先看它的原始碼:
def threadsafe_function(fcn):
# 範例化出了一把遞迴鎖
lock = threading.RLock()
def new(*args, **kwargs):
# 當 Master.execute() 和 TcpQuery._get_transaction_id() 方法沒有通過
# 關鍵字傳參傳入 threadsafe=False 時,將預設開啟執行緒安全模式來執行
# 這 2 個方法
threadsafe = kwargs.pop('threadsafe', True)
if threadsafe:
lock.acquire()
try:
ret = fcn(*args, **kwargs)
except Exception as excpt:
raise excpt
finally:
if threadsafe:
lock.release()
return ret
return new
這個 threading lock 會導致什麼問題呢?當 Python 直譯器執行到 Master.execute() 方法頭上時,就會自動執行該裝飾器。
而 lock 變數也就生成了,最後會返回內部閉函數 new()。
可以理解為這個 lock 已經被當成了一個全域性變數,後續無論是建立多少個 TcpMaster 的範例物件,lock 變數所指向的鎖都是同一個。
通過原始碼分析我們得知,Master.execute() 方法中會去建立 socket 連結,一旦有 1 個 device 連結時間過長,也將會導致其他的 device 通訊或連結阻塞。
因為它們都是用的同一個 lock 鎖。所以,一般來說在使用時我們會在 Master.execute() 方法中顯式的傳遞 threadsafe=False 的關鍵字引數,自己實現 lock 來解決同一 device 不能同時讀寫的問題。
在上面分析原始碼時,我們會看到很多 call_hooks 的執行,他們其實是 modbus_tk 模組所提供的勾點函數。只要實現相應的勾點函數就會在整個 modbus_tcp 的資料傳遞生命週期中自動執行。
以下是常見的勾點函數:
def install_hook(name, fct):
"""
Install one of the following hook
modbus_rtu.RtuMaster.before_open((master,))
modbus_rtu.RtuMaster.after_close((master,)
modbus_rtu.RtuMaster.before_send((master, request)) returns modified request or None
modbus_rtu.RtuMaster.after_recv((master, response)) returns modified response or None
modbus_rtu.RtuServer.before_close((server, ))
modbus_rtu.RtuServer.after_close((server, ))
modbus_rtu.RtuServer.before_open((server, ))
modbus_rtu.RtuServer.after_open(((server, ))
modbus_rtu.RtuServer.after_read((server, request)) returns modified request or None
modbus_rtu.RtuServer.before_write((server, response)) returns modified response or None
modbus_rtu.RtuServer.after_write((server, response))
modbus_rtu.RtuServer.on_error((server, excpt))
modbus_tcp.TcpMaster.before_connect((master, ))
modbus_tcp.TcpMaster.after_connect((master, ))
modbus_tcp.TcpMaster.before_close((master, ))
modbus_tcp.TcpMaster.after_close((master, ))
modbus_tcp.TcpMaster.before_send((master, request))
modbus_tcp.TcpServer.after_send((master, request))
modbus_tcp.TcpMaster.after_recv((master, response))
modbus_tcp.TcpServer.on_connect((server, client, address))
modbus_tcp.TcpServer.on_disconnect((server, sock))
modbus_tcp.TcpServer.after_recv((server, sock, request)) returns modified request or None
modbus_tcp.TcpServer.before_send((server, sock, response)) returns modified response or None
modbus_tcp.TcpServer.on_error((server, sock, excpt))
modbus_rtu_over_tcp.RtuOverTcpMaster.after_recv((master, response))
modbus.Master.before_send((master, request)) returns modified request or None
modbus.Master.after_send((master))
modbus.Master.after_recv((master, response)) returns modified response or None
modbus.Slave.handle_request((slave, request_pdu)) returns modified response or None
modbus.Slave.handle_write_multiple_coils_request((slave, request_pdu))
modbus.Slave.handle_write_multiple_registers_request((slave, request_pdu)) returns modified response or None
modbus.Slave.handle_write_single_register_request((slave, request_pdu)) returns modified response or None
modbus.Slave.handle_write_single_coil_request((slave, request_pdu)) returns modified response or None
modbus.Slave.handle_read_input_registers_request((slave, request_pdu)) returns modified response or None
modbus.Slave.handle_read_holding_registers_request((slave, request_pdu)) returns modified response or None
modbus.Slave.handle_read_discrete_inputs_request((slave, request_pdu)) returns modified response or None
modbus.Slave.handle_read_coils_request((slave, request_pdu)) returns modified response or None
modbus.Slave.handle_read_write_multiple_registers_request((slave, request_pdu)) returns modified response or None
modbus.Slave.handle_read_exception_status_request((slave, request_pdu)) returns modified response or None
modbus.Slave.on_handle_broadcast((slave, response_pdu)) returns modified response or None
modbus.Slave.on_exception((slave, function_code, excpt))
modbus.Databank.on_error((db, excpt, request_pdu))
modbus.ModbusBlock.setitem((self, slice, value))
modbus.Server.before_handle_request((server, request)) returns modified request or None
modbus.Server.after_handle_request((server, response)) returns modified response or None
modbus.Server.on_exception((server, excpt))
"""
with _LOCK:
try:
_HOOKS[name].append(fct)
except KeyError:
_HOOKS[name] = [fct]