作爲軟體工程師,我有多次在要求完成指定任務時感到渾身一冷的經歷。其中有一次,我必須在一些新的硬體基礎設施和雲基礎設施之間寫一個介面,這些硬體需要 C 語言,而雲基礎設施主要是用 Python。
實現的方式之一是 用 C 寫擴充套件模組 ,Python 支援 C 擴充套件的呼叫。快速瀏覽文件後發現,這需要編寫大量的 C 程式碼。這樣做的話,在有些情況下效果還不錯,但不是我喜歡的方式。另一種方式就是將兩個任務放在不同的進程中,並使用 ZeroMQ 訊息庫 在兩者之間交換訊息。
在發現 ZeroMQ 之前,遇到這種型別的情況時,我選擇了編寫擴充套件的方式。這種方式不算太差,但非常費時費力。如今,爲了避免那些問題,我將一個系統細分爲獨立的進程,通過 通訊通訊端 發送訊息來交換資訊。這樣,不同的程式語言可以共存,每個進程也變簡單了,同時也容易偵錯。
ZeroMQ 提供了一個更簡單的過程:
編寫一小段 C 程式碼,從硬體讀取數據,然後把發現的東西作爲訊息發送出去。
使用 Python 編寫介面,實現新舊基礎設施之間的對接。
Pieter Hintjens 是 ZeroMQ 專案發起者之一,他是個擁有 有趣視角和作品 的非凡人物。
準備
本教學中,需要:
一個 C 編譯器(例如 GCC 或 Clang )
libzmq 庫
Python 3
ZeroMQ 的 Python 封裝
Fedora 系統上的安裝方法:
$ dnf install clang zeromq zeromq-devel python3 python3-zmq
Debian 和 Ubuntu 系統上的安裝方法:
$ apt-get install clang libzmq5 libzmq3-dev python3 python3-zmq
如果有問題,參考對應專案的安裝指南(上面附有鏈接)。
編寫硬體介面庫
因爲這裏針對的是個設想的場景,本教學虛構了包含兩個函數的操作庫:
fancyhw_init() 用來初始化(設想的)硬體
fancyhw_read_val() 用於返回從硬體讀取的數據
將庫的完整程式碼儲存到檔案 libfancyhw.h 中:
#ifndef LIBFANCYHW_H
#define LIBFANCYHW_H
#include <stdlib.h>
#include <stdint.h>
// This is the fictitious hardware interfacing library
void fancyhw_init(unsigned int init_param)
{
srand(init_param);
}
int16_t fancyhw_read_val(void)
{
return (int16_t)rand();
}
#endif
這個庫可以模擬你要在不同語言實現的元件間交換的數據,中間有個亂數發生器。
設計 C 介面
下面 下麪從包含管理數據傳輸的庫開始,逐步實現 C 介面。
需要的庫
開始先載入必要的庫(每個庫的作用見程式碼註釋):
// For printf()
#include <stdio.h>
// For EXIT_*
#include <stdlib.h>
// For memcpy()
#include <string.h>
// For sleep()
#include <unistd.h>
#include <zmq.h>
#include 「libfancyhw.h」
必要的參數
定義 main 函數和後續過程中必要的參數:
int main(void)
{
const unsigned int INIT_PARAM = 12345;
const unsigned int REPETITIONS = 10;
const unsigned int PACKET_SIZE = 16;
const char *TOPIC = 「fancyhw_data」;
...
初始化
所有的庫都需要初始化。虛構的那個只需要一個參數:
fancyhw_init(INIT_PARAM);
ZeroMQ 庫需要實打實的初始化。首先,定義物件 context,它是用來管理全部的通訊端的:
void *context = zmq_ctx_new();
if (!context)
{
printf(「ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n」, zmq_strerror(errno));
return EXIT_FAILURE;
}
之後定義用來發送數據的通訊端。ZeroMQ 支援若幹種通訊端,各有其用。使用 publish 通訊端(也叫 PUB 通訊端),可以複製訊息並分發到多個接收端。這使得你可以讓多個接收端接收同一個訊息。沒有接收者的訊息將被丟棄(即不會入訊息佇列)。用法如下:
void *data_socket = zmq_socket(context, ZMQ_PUB);
通訊端需要系結到一個具體的地址,這樣用戶端就知道要連線哪裏了。本例中,使用了 TCP 傳輸層 (當然也有 其它選項 ,但 TCP 是不錯的預設選擇):
const int rb = zmq_bind(data_socket, 「tcp://*:5555」);
if (rb != 0)
{
printf(「ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n」, zmq_strerror(errno));
return EXIT_FAILURE;
}
下一步, 計算一些後續要用到的值。 注意下面 下麪程式碼中的 TOPIC,因爲 PUB 通訊端發送的訊息需要系結一個主題。主題用於供接收者過濾訊息:
const size_t topic_size = strlen(TOPIC);
const size_t envelope_size = topic_size + 1 + PACKET_SIZE * sizeof(int16_t);
printf(「Topic: %s; topic size: %zu; Envelope size: %zu\n」, TOPIC, topic_size, envelope_size);
發送訊息
啓動一個發送訊息的回圈,回圈 REPETITIONS 次:
for (unsigned int i = 0; i < REPETITIONS; i++)
{
…
發送訊息前,先填充一個長度爲 PACKET_SIZE 的緩衝區。本庫提供的是 16 個位的有符號整數。因爲 C 語言中 int 型別佔用空間大小與平臺相關,不是確定的值,所以要使用指定寬度的 int 變數:
int16_t buffer[PACKET_SIZE];
for (unsigned int j = 0; j < PACKET_SIZE; j++)
{
buffer[j] = fancyhw_read_val();
}
printf(「Read %u data values\n」, PACKET_SIZE);
訊息的準備和發送的第一步是建立 ZeroMQ 訊息,爲訊息分配必要的記憶體空間。空白的訊息是用於封裝要發送的數據的:
zmq_msg_t envelope;
const int rmi = zmq_msg_init_size(&envelope, envelope_size);
if (rmi != 0)
{
printf(「ERROR: ZeroMQ error occurred during zmq_msg_init_size(): %s\n」, zmq_strerror(errno));
zmq_msg_close(&envelope);
break;
}
現在記憶體空間已分配,數據儲存在 ZeroMQ 訊息 「信封」中。函數 zmq_msg_data() 返回一個指向封裝數據快取區頂端的指針。第一部分是主題,之後是一個空格,最後是二進制數。主題和二進制數據之間的分隔符採用空格字元。需要遍歷快取區的話,使用型別轉換和 指針演算法 。(感謝 C 語言,讓事情變得直截了當。)做法如下:
memcpy(zmq_msg_data(&envelope), TOPIC, topic_size);
memcpy((void*)((char*)zmq_msg_data(&envelope) + topic_size), " ", 1);
memcpy((void*)((char*)zmq_msg_data(&envelope) + 1 + topic_size), buffer, PACKET_SIZE * sizeof(int16_t))
通過 data_socket 發送訊息:
const size_t rs = zmq_msg_send(&envelope, data_socket, 0);
if (rs != envelope_size)
{
printf(「ERROR: ZeroMQ error occurred during zmq_msg_send(): %s\n」, zmq_strerror(errno));
zmq_msg_close(&envelope);
break;
}
使用數據之前要先解除封裝:
zmq_msg_close(&envelope);
printf(「Message sent; i: %u, topic: %s\n」, i, TOPIC);
清理
C 語言不提供 垃圾收集 功能,用完之後記得要自己掃尾。發送訊息之後結束程式之前,需要執行掃尾程式碼,釋放分配的記憶體:
const int rc = zmq_close(data_socket);
if (rc != 0)
{
printf(「ERROR: ZeroMQ error occurred during zmq_close(): %s\n」, zmq_strerror(errno));
return EXIT_FAILURE;
}
const int rd = zmq_ctx_destroy(context);
if (rd != 0)
{
printf(「Error occurred during zmq_ctx_destroy(): %s\n」, zmq_strerror(errno));
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
完整 C 程式碼
儲存下面 下麪完整的介面程式碼到本地名爲 hw_interface.c 的檔案:
// For printf()
#include <stdio.h>
// For EXIT_*
#include <stdlib.h>
// For memcpy()
#include <string.h>
// For sleep()
#include <unistd.h>
#include <zmq.h>
#include 「libfancyhw.h」
int main(void)
{
const unsigned int INIT_PARAM = 12345;
const unsigned int REPETITIONS = 10;
const unsigned int PACKET_SIZE = 16;
const char *TOPIC = 「fancyhw_data」;
fancyhw_init(INIT_PARAM);
void *context = zmq_ctx_new();
if (!context)
{
printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno));
return EXIT_FAILURE;
}
void *data_socket = zmq_socket(context, ZMQ_PUB);
const int rb = zmq_bind(data_socket, "tcp://*:5555");
if (rb != 0)
{
printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno));
return EXIT_FAILURE;
}
const size_t topic_size = strlen(TOPIC);
const size_t envelope_size = topic_size + 1 + PACKET_SIZE * sizeof(int16_t);
printf("Topic: %s; topic size: %zu; Envelope size: %zu\n", TOPIC, topic_size, envelope_size);
for (unsigned int i = 0; i < REPETITIONS; i++)
{
int16_t buffer[PACKET_SIZE];
for (unsigned int j = 0; j < PACKET_SIZE; j++)
{
buffer[j] = fancyhw_read_val();
}
printf("Read %u data values\n", PACKET_SIZE);
zmq_msg_t envelope;
const int rmi = zmq_msg_init_size(&envelope, envelope_size);
if (rmi != 0)
{
printf("ERROR: ZeroMQ error occurred during zmq_msg_init_size(): %s\n", zmq_strerror(errno));
zmq_msg_close(&envelope);
break;
}
memcpy(zmq_msg_data(&envelope), TOPIC, topic_size);
memcpy((void*)((char*)zmq_msg_data(&envelope) + topic_size), " ", 1);
memcpy((void*)((char*)zmq_msg_data(&envelope) + 1 + topic_size), buffer, PACKET_SIZE * sizeof(int16_t));
const size_t rs = zmq_msg_send(&envelope, data_socket, 0);
if (rs != envelope_size)
{
printf("ERROR: ZeroMQ error occurred during zmq_msg_send(): %s\n", zmq_strerror(errno));
zmq_msg_close(&envelope);
break;
}
zmq_msg_close(&envelope);
printf("Message sent; i: %u, topic: %s\n", i, TOPIC);
sleep(1);
}
const int rc = zmq_close(data_socket);
if (rc != 0)
{
printf("ERROR: ZeroMQ error occurred during zmq_close(): %s\n", zmq_strerror(errno));
return EXIT_FAILURE;
}
const int rd = zmq_ctx_destroy(context);
if (rd != 0)
{
printf("Error occurred during zmq_ctx_destroy(): %s\n", zmq_strerror(errno));
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
用如下命令編譯:
$ clang -std=c99 -I. hw_interface.c -lzmq -o hw_interface
如果沒有編譯錯誤,你就可以執行這個介面了。貼心的是,ZeroMQ PUB 通訊端可以在沒有任何應用發送或接受數據的狀態下執行,這簡化了使用複雜度,因爲這樣不限制進程啓動的次序。
執行該介面:
$ ./hw_interface
Topic: fancyhw_data; topic size: 12; Envelope size: 45
Read 16 data values
Message sent; i: 0, topic: fancyhw_data
Read 16 data values
Message sent; i: 1, topic: fancyhw_data
Read 16 data values
…
…
輸出顯示數據已經通過 ZeroMQ 完成發送,現在要做的是讓一個程式去讀數據。
編寫 Python 數據處理器
現在已經準備好從 C 程式向 Python 應用傳送數據了。
庫
需要兩個庫幫助實現數據傳輸。首先是 ZeroMQ 的 Python 封裝:
$ python3 -m pip install zmq
另一個就是 struct 庫 ,用於解碼二進制數據。這個庫是 Python 標準庫的一部分,所以不需要使用 pip 命令安裝。
Python 程式的第一部分是匯入這些庫:
import zmq
import struct
重要參數
使用 ZeroMQ 時,只能向常數 TOPIC 定義相同的接收端發送訊息:
topic = 「fancyhw_data」.encode(‘ascii’)
print(「Reading messages with topic: {}」.format(topic))
初始化
下一步,初始化上下文和通訊端。使用 subscribe 通訊端(也稱爲 SUB 通訊端),它是 PUB 通訊端的天生伴侶。這個通訊端發送時也需要匹配主題。
with zmq.Context() as context:
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:5555")
socket.setsockopt(zmq.SUBSCRIBE, topic)
i = 0
...
接收訊息
啓動一個無限回圈,等待接收發送到 SUB 通訊端的新訊息。這個回圈會在你按下 Ctrl+C 組合鍵或者內部發生錯誤時終止:
try:
while True:
... # we will fill this in next
except KeyboardInterrupt:
socket.close()
except Exception as error:
print("ERROR: {}".format(error))
socket.close()
這個回圈等待 recv() 方法獲取的新訊息,然後將接收到的內容從第一個空格字元處分割開,從而得到主題:
binary_topic, data_buffer = socket.recv().split(b’ ', 1)
解碼訊息
Python 此時尚不知道主題是個字串,使用標準 ASCII 編解碼器進行解碼:
topic = binary_topic.decode(encoding = ‘ascii’)
print(「Message {:d}:」.format(i))
print("\ttopic: ‘{}’".format(topic))
下一步就是使用 struct 庫讀取二進制數據,它可以將二進制數據段轉換爲明確的數值。首先,計算數據包中數值的組數。本例中使用的 16 個位的有符號整數對應的是 struct 格式字元 中的 h:
packet_size = len(data_buffer) // struct.calcsize(「h」)
print("\tpacket size: {:d}".format(packet_size))
知道數據包中有多少組數據後,就可以通過構建一個包含數據組數和數據型別的字串,來定義格式了(比如「16h」):
struct_format = 「{:d}h」.format(packet_size)
將二進制數據串轉換爲可直接列印的一系列數位:
data = struct.unpack(struct_format, data_buffer)
print("\tdata: {}".format(data))
完整 Python 程式碼
下面 下麪是 Python 實現的完整的接收端:
#! /usr/bin/env python3
import zmq
import struct
topic = 「fancyhw_data」.encode(‘ascii’)
print(「Reading messages with topic: {}」.format(topic))
with zmq.Context() as context:
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:5555")
socket.setsockopt(zmq.SUBSCRIBE, topic)
i = 0
try:
while True:
binary_topic, data_buffer = socket.recv().split(b' ', 1)
topic = binary_topic.decode(encoding = 'ascii')
print("Message {:d}:".format(i))
print("\ttopic: '{}'".format(topic))
packet_size = len(data_buffer) // struct.calcsize("h")
print("\tpacket size: {:d}".format(packet_size))
struct_format = "{:d}h".format(packet_size)
data = struct.unpack(struct_format, data_buffer)
print("\tdata: {}".format(data))
i += 1
except KeyboardInterrupt:
socket.close()
except Exception as error:
print("ERROR: {}".format(error))
socket.close()
將上面的內容儲存到名爲 online_analysis.py 的檔案。Python 程式碼不需要編譯,你可以直接執行它。
執行輸出如下:
$ ./online_analysis.py
Reading messages with topic: b’fancyhw_data’
Message 0:
topic: ‘fancyhw_data’
packet size: 16
data: (20946, -23616, 9865, 31416, -15911, -10845, -5332, 25662, 10955, -32501, -18717, -24490, -16511, -28861, 24205, 26568)
Message 1:
topic: ‘fancyhw_data’
packet size: 16
data: (12505, 31355, 14083, -19654, -9141, 14532, -25591, 31203, 10428, -25564, -732, -7979, 9529, -27982, 29610, 30475)
…
…
小結
本教學介紹了一種新方式,實現從基於 C 的硬體介面收集數據,並分發到基於 Python 的基礎設施的功能。藉此可以獲取數據供後續分析,或者轉送到任意數量的接收端去。它採用了一個訊息庫實現數據在發送者和處理者之間的傳送,來取代同樣功能規模龐大的軟體。
本教學還引出了我稱之爲「軟體粒度」的概念,換言之,就是將軟體細分爲更小的部分。這種做法的優點之一就是,使得同時採用不同的程式語言實現最簡介面作爲不同部分之間溝通的元件成爲可能。
實踐中,這種設計使得軟體工程師能以更獨立、合作更高效的方式做事。不同的團隊可以專注於數據分析的不同方面,可以選擇自己中意的實現工具。這種做法的另一個優點是實現了零代價的並行,因爲所有的進程都可以並行執行。 ZeroMQ 訊息庫 是個令人讚歎的軟體,使用它可以讓工作大大簡化。