混合程式設計python與C++

2023-06-08 06:01:33

上個版本: 只是用到ctypes進行傳輸, 這次將python伺服器端更改為C++伺服器端,方便後續維護.
本文實現功能: python傳輸圖片給C++, C++接受圖片後對圖片進行處理,並將結果返回給python使用者端, pass image from python to C++

C++ 伺服器端

.h檔案

注意文中的model

// .h
#pragma once
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <signal.h>
#include <opencv2/opencv.hpp>

using namespace std;
using namespace cv;

class ModelManager;

class ServerManager
{
private:
	int m_port;
	char *m_addr;
	cv::VideoCapture m_cap;
	int m_server;
	int m_accept; // client conn
public:
	bool initialization(const int &port, const cv::VideoCapture &cap, char *addr = nullptr);
	bool initialization(const int &port, char *addr = nullptr);
	bool build_connect();
	bool acceptClient();
	void error_print(const char *ptr);
	bool free_connect();

	bool send_data_frame(ModelManager& model);
	bool receive_data_frame(cv::Mat &frame, ModelManager& model);
};

.cpp檔案

#include "ServerManager.h"
#include "ModelManager.h"
#define BUFFER_SIZE 65538

void ServerManager::error_print(const char * ptr) {
        perror(ptr);
        exit(EXIT_FAILURE);
}

bool ServerManager::initialization(const int& port, const cv::VideoCapture& cap, char* addr){
    m_port = htons(port);
	m_addr = addr;
	m_cap = cap;
	return true;
}

bool ServerManager::initialization(const int& port, char* addr){
    m_port = htons(port);
    m_addr = addr;
	return true;
}

bool ServerManager::build_connect() {
	struct sockaddr_in server_addr;
    bzero(&server_addr,sizeof(server_addr));
	server_addr.sin_family = AF_INET;
	server_addr.sin_addr.s_addr = m_addr?inet_addr(m_addr):INADDR_ANY;  
	server_addr.sin_port = m_port;
	// create socket 
	m_server = socket(AF_INET, SOCK_STREAM, 0);
	if(m_server < 0)
        error_print("socket bind error");
	// can reuse port
    int on = 1;
    if(setsockopt(m_server,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(on)) < 0)
        error_print("setsockopt error");
    // bind addr
    if(bind(m_server, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0)
        error_print("bind error");
    // listen only one client
    if(listen(m_server, 1) < 0)
        error_print("listen failed");
    
	cout << "ServerManager is listening, plesae wait..." << endl;
	
	return true;
}

bool ServerManager::acceptClient(){
	struct sockaddr_in accept_addr;
	socklen_t accept_len = sizeof(accept_addr);
	bzero(&accept_addr,sizeof(accept_addr));
	// accept client connection
    if((m_accept = accept(m_server,(struct sockaddr*)&accept_addr,&accept_len)) < 0)
        error_print("accept error");
	std::cout << "Connection established" << std::endl;
	return true;
}

bool ServerManager::send_data_frame(ModelManager& model) {
	char *json_output = nullptr;
	json_output = model.createJson();
	if (json_output == nullptr) {
		return false;
	}
	
	// printf("send data %s\n", json_output);
	// just send json_output, dont memcpy new char*!!! it wastes me two hours
	// send json
	int result = send(m_accept, json_output, strlen(json_output), 0);
	if (result == -1) {
		cout << "send fail" << endl;
		return false;
	}
	return true;
}


bool ServerManager::receive_data_frame(Mat& frame, ModelManager& model) {
	// recv frame size
	int data_size;
	if (recv(m_accept, &data_size, sizeof(data_size), 0) != sizeof(data_size)) {
		// when client close, then close connection
		close(m_accept);
		cout << "close connection to client" << endl;
		acceptClient(); // restart a new accept, to accept new connection
		return false;
	}
	cout << data_size << endl;
	// recv frame data
    
	// char buf[data_size];
	// std::vector<uchar> decode;
	// int bytes_received = 0;
	// do
	// {
	// 	int nBytes = recv(m_accept, buf, data_size - bytes_received, 0);
	// 	for (int i = 0; i < nBytes; i++)  // maybe can use memcpy, maybe faster
	// 	{
	// 		decode.emplace_back(buf[i]);
	// 	}
	// 	cout << bytes_received << endl;
	// 	bytes_received += nBytes;
	// } while (bytes_received < data_size);
    char *recv_char = new char[data_size];
	std::vector<uchar> decode(data_size, 0);
	int index = 0;
	int bytes_received = 0;
	int count = data_size;
	while (count > 0)// if count >= 0, dead loop
	{
		int iRet = recv(m_accept, recv_char, count, 0);
		if (index >= data_size) index = data_size;
		memcpy(&decode[index], recv_char , iRet);
		index += iRet;
		if (!iRet) { return -1; }
		count -= iRet;
	}
	// decode message
	frame = imdecode(decode, cv::IMREAD_COLOR);
	// push into Model's queueMat
	model.mtxQueueImg.lock();
	model.queueMat.push(frame);
	model.mtxQueueImg.unlock();
	return true;
}

bool ServerManager::free_connect() {
	m_cap.release();
	close(m_accept);
    close(m_server);
	return true;
}

C++ model部分程式碼

.h檔案

#pragma once
#include "CV_Classify.h"
#include "CV_Detect.h"
#include "ServerManager.h"
#include <opencv2/opencv.hpp>
#include <mutex>
#include <queue>
#include <unistd.h> // usleep
#include <thread>
#include "cJSON.h"
#include <string>

using namespace std;
using namespace cv;

class ModelManager{
public:
    Detect objdetect;
    Classify objclassify;
    std::mutex mtxQueueDet;  // mutex for detect queue
    std::mutex mtxQueueImg;  // mutex for image queue
    std::mutex mtxQueueCls;  // mutex for classify queue
    std::queue<cv::Mat> queueMat;
    std::queue<ObjDetectOutput> queueDetOut;// Detect queue
    std::queue<ObjClassifyOutput> queueClsOut;// Classify queue
    
    bool DetectFlag = true;
    bool ClassifyFlag = true;
    bool empty_flag = false;
    friend class ServerManager;
public:
    void initDetectModel() ;
    void initClassifyModel() ;
    void DetectImg();
    void ClassifyImg();
    void getClsResult(ObjClassifyOutput &output);
    // ObjClassifyOutput getClsResult();
    char* createJson();
};

.cpp檔案

部分有刪減,createJson可參考使用,利用json來傳遞值

#include "ModelManager.h"

void ModelManager::initDetectModel() 
{
    std::string config_path = "DetectConfig.yaml";
    objdetect.Init(config_path, 1);
}
void ModelManager::initClassifyModel() 
{
    std::string config_path = "ClassiflyConfig.yaml";
    objclassify.Init(config_path, 1);
}

void ModelManager::DetectImg()
{
    DetectInput detect_input;
    DetectOutput detect_output;
    cv::Mat frame;
    size_t mm = 0;
    while(1)
    { 
        if (queueMat.empty()) 
        {
            if(!DetectFlag)
            {
                break;
            }
			usleep(2000);
            continue;
		}
        // get image from queueMat
        mtxQueueImg.lock();
        frame = queueMat.front();
        queueMat.pop();
        mtxQueueImg.unlock();
        // run model
        objdetect.Run(detect_input, detect_output);
        // push detect result into queueDetOut
        mtxQueueDet.lock();
        queueDetOut.push(detect_output);
        // cout << "detect run !!" << endl;
        mtxQueueDet.unlock();
    }
    return;
}
void ModelManager::ClassifyImg()
{
    ObjClassifyInput input;
    ObjClassifyOutput output;
    cv::Mat frame;
    Detoutput detect_result;
    while(1)
    {
        if (queueDetOut.empty()) 
        {
            if(!ClassifyFlag)
            {
                break;
            }
			usleep(2000);
            continue;
		}
        // get detect from queueDetOut
        mtxQueueDet.lock();
        detect_result = queueDetOut.front();
        queueDetOut.pop();
        mtxQueueDet.unlock();
        // run model
        objclassify.Run(input, output);
        // push cls result into queueClsOut
        mtxQueueCls.lock();
        queueClsOut.push(output);
        mtxQueueCls.unlock();  
    }
    return;
}

void ModelManager::getClsResult(ObjClassifyOutput& output){
    if (queueClsOut.empty()){
        output.object_list.object_num = -1;  // -1 is now empty;
        return; // must return in thread otherwise cant use &output
    }
    output = queueClsOut.front();
    queueClsOut.pop();
    return;
}

char* ModelManager::createJson()  // dont know why cant use &value, need return value
{
    mtxQueueCls.lock();
    ObjClassifyOutput output;
    getClsResult(output); 
    mtxQueueCls.unlock();
	
	if (output.object_list.object_num == -1){
		return nullptr;
	}
	// prepare send data json
	cJSON* json_object_list = NULL;
	cJSON* json_ObjClassifyOutput = NULL;
	
	json_ObjClassifyOutput = cJSON_CreateObject();
	json_object_list = cJSON_CreateObject();
	cJSON_AddItemToObject(json_ObjClassifyOutput, "object_list", json_object_list);
	
	int obj_num = output.object_list.object_num;
	cJSON_AddNumberToObject(json_object_list, "object_num", obj_num);
	for (int i = 0; i < obj_num; ++i){
		cJSON* json_object = cJSON_CreateObject();
		cJSON* json_box = cJSON_CreateObject();
		cJSON_AddNumberToObject(json_box,"x", output.object_list.object[i].bbox.x);
		cJSON_AddNumberToObject(json_box,"y", output.object_list.object[i].bbox.y);
		cJSON_AddNumberToObject(json_box,"w", output.object_list.object[i].bbox.w);
		cJSON_AddNumberToObject(json_box,"h", output.object_list.object[i].bbox.h);
		cJSON_AddItemToObject(json_object,"bbox", json_box);
		cJSON_AddNumberToObject(json_object, "classes", output.object_list.object[i].classes);
		cJSON_AddNumberToObject(json_object, "objectness", output.object_list.object[i].objectness);
        // double prob = output.object_list.object[i].prob;
		// cJSON_AddNumberToObject(json_object, "prob", prob); // pointer cant use?
		string str = "object" + to_string(i);
        cJSON_AddItemToObject(json_object_list, str.c_str(), json_object);
        // printf("prob: %f", output.object_list.object[i].prob);
	}
	
	char* json_output = cJSON_Print(json_ObjClassifyOutput);
    cJSON_Delete(json_ObjClassifyOutput);
    return json_output;
}

C++ 伺服器端執行

#include <../include/ServerManager.h>
#include <../include/ModelManager.h>
#include <thread>
#define PORT 8080
void recvServer(ServerManager& s, ModelManager& model){
    int idx = 0;
    
    while (true){
        // auto start = std::chrono::steady_clock::now();
        cv::Mat frame;
        s.receive_data_frame(frame, model);
        // cal time cost
        // auto end = std::chrono::steady_clock::now();
        // std::chrono::duration<double, std::milli> elapsed = end - start;
        // std::cout << "recv execution time: " << elapsed.count() << " ms\n";
        if (frame.empty()) {
            usleep(2000);
            continue;
        }
        // cv::imwrite("image"+to_string(idx++)+".jpg", frame);
        std::cout << "Image " << idx++ <<" received !!" << std::endl;
    }
}

void sendServer(ServerManager& s, ModelManager& model){
    while (true){
        if (s.send_data_frame(model)) {
            cout << "send success!!" << endl;
            cout << endl;
        }else{
            // cout << "send fail!!" << endl;
            usleep(2000);
        }
    }
}
 
int main()
{
    ServerManager s;
    ModelManager model;
    model.initDetectModel();
    model.initClassifyModel();
    cout << endl;
    s.initialization(PORT);
    s.build_connect();
    s.acceptClient();
    
    thread recv_server(recvServer, std::ref(s), std::ref(model));
    thread send_server(sendServer, std::ref(s), std::ref(model));
    thread detect(&ModelManager::DetectImg, &model);
    thread classfy(&ModelManager::ClassifyImg, &model);
    detect.join();
    classfy.join();
    recv_server.join();
    send_server.join();
    return 0;
}

python使用者端

import json
import socket
import struct
import time
from multiprocessing import JoinableQueue
from threading import Thread

import os
from natsort import ns, natsorted

host = '192.168.0.2'  # '192.168.0.2' 'localhost'
port = 8080


def img_encode(img_path):
    img = cv2.imread(img_path)
    # img = cv2.resize(img, (500, 500), interpolation=cv2.INTER_CUBIC)
    img_param = [95]  # 圖片壓縮率0-100
    _, img = cv2.imencode('.jpg', img, img_param)
    img = img.tobytes()
    return img


def img_product(img_queue, path, path_mode='image'):
    if path_mode == 'image':
        image = img_encode(path)
        img_queue.put(image)
    elif path_mode == 'dir':
        dir_list = os.listdir(path)
        files = natsorted(dir_list, alg=ns.PATH)  # 順序讀取檔名
        for filename in files:
            img_path = path + '/' + filename
            image = img_encode(img_path)
            img_queue.put(image)
        img_queue.put('E')
    img_queue.join()


def server_consumer(img_queue):
    while True:
        start = int(round(time.time() * 1000))
        # 1. get img from queue
        img_obj = img_queue.get()
        img_queue.task_done()
        # get end signal
        if img_obj[0] == 'E':
        client.close()
        break
        # 2. send package(img_bytes_size, img_bytes)
        pack_size = struct.pack("i", len(img_obj))
        client.send(pack_size + img_obj)
        end = int(round(time.time() * 1000))

        data = client.recv(65536)
        json_str = data.decode('utf8', 'ignore').strip(b'\x00'.decode())
        results = json.loads(json_str)
        end = int(round(time.time() * 1000))
        end_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
        print('send and recv cost time: ', (end - start))
        print(results)


        if __name__ == '__main__':
        client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        client.connect((host, port))
        img_dir = 'data'
        one_img = './data/image.jpg'
        mode = 'dir'

        img_jq = JoinableQueue()
        producer = Thread(target=img_product, args=(img_jq, img_dir, mode,))
        consumer = Thread(target=server_consumer, args=(img_jq,))
        producer.daemon = True  # set daemon but not set join()

        producer.start()
        consumer.start()

    # producer.join() // 讓生產者先關閉,防止close錯誤
        consumer.join()

總結

  • 其實這個專案真正做完感覺還是挺簡單, 就是對socket通訊不太熟悉, 以及傳圖片沒做過.
  • 實際上傳圖片只需要讀取圖片後,imencode,然後tobytes,最後傳送size和data即可.而接受端只需要拼接陣列,然後imdecode即可.
  • 另外傳輸結果的話利用json傳輸可以讓結果可讀性可高, 傳輸也比較方便, 當時copy別人的傳送程式碼, 沒有細看,導致使用memcpy讓json格式亂碼,導致無法解碼json.
  • 如果你感覺接收端沒問題,一定要看看傳送端.
  • 之後的新任務就是視訊傳輸利用rtsp流,敬請期待

參考部落格