linux環境程式設計(3): 使用POSIX IPC完成程序間通訊

2023-02-06 18:00:47

1. 寫在前面

之前的文章總結了使用管道進行程序間通訊的方法,除了pipe和fifo,Linux核心還為我們提供了其他更高階的IPC方式,包括共用記憶體,訊息佇列,號誌等,本篇文章會通過一個具有完整邏輯功能的範例說明如何使用這些IPC方法。畢竟單純地查手冊,寫程式碼...周而復始,這個過程還是比較枯燥的,而且並沒有哪個IPC方法能解決所有的程序間通訊問題,每種方法都不是孤立存在的,通過一個小例子把它們串聯起來,是一種更好的學習方式。下文中的程式碼實現可以參考我的程式碼倉庫

2. POSIX IPC概述

程序間通訊,主要解決兩個問題,即資料傳遞和同步。POSIX IPC提供了下面三種方法:

  • 訊息佇列
  • 共用記憶體
  • 號誌

作業系統中執行的程序,彼此之間是隔離的,要想實現通訊,就必須有一個媒介,是通訊雙方都可以存取到的。從這個角度看,作業系統核心正是每個程序都可以存取到的那個媒介,就像一個"全域性變數"。訊息佇列不過是核心維護的一個佇列,儲存了使用者程序傳送來的訊息,其他程序可以從佇列中取走訊息,每個訊息可以設定優先順序,程序傳送和接收訊息的行為可以是阻塞或者非阻塞的,這一點類似管道;共用記憶體就是利用了虛擬地址空間以及實體地址空間,讓不同程序的虛擬地址對映到同一個物理頁面上,這樣就實現了共用,對於對映的地址空間可以設定可讀,可寫以及可執行的標誌;號誌就像一個核心中的整型變數,這個變數的數值記錄了某種資源的數量,程序可以對它進行加減操作,合理使用的話就能完成想要的程序之間的同步邏輯。可以看到,這三種IPC方法在核心中都對應了一種資料結構,為了能夠讓使用者程序存取到這些資料結構,POSIX IPC延續了「一切皆檔案」的設計思路,我們可以用類似「/somename」這種形式的檔名去建立或者開啟這些IPC物件,然後對它們進行各種操作,和檔案的存取許可權類似,程序操作IPC物件時也會進行許可權檢查。可能上面對三種POSIX IPC的描述存在不嚴謹的地方,但對於使用者來說,我們只要在腦子裡建立一個合適的,能夠描述它們工作方式的模型就可以了,而不是不斷重複手冊中對每個api的敘述。下面的表格列出了常用的POSIX IPC api:

訊息佇列 共用記憶體 號誌
開啟 mq_open shm_open sem_open
關閉 mq_close shm_close sem_close
操作 mq_send/mq_receive 記憶體讀寫 sem_wait/sem_post
刪除 mq_unlink shm_unlink sem_unlink

3. POSIX IPC使用

3.1 專案功能說明

下面將使用三種POSIX IPC實現一個簡單的專案,用來記錄IPC的使用方法。專案包含一個server程序和若干個client程序,他們各自的功能如下:

  • server程序
    • 首先執行,等待client的連線到來;
    • 收到client的連線,fork出一個新的程序去處理client的請求;
  • client程序
    • 可以同時執行多個;
    • 啟動時和server建立連線,連線建立完成之後,接受用於輸入,向server發起請求;
    • 可以完成主動斷開連線,終止server程序,以及其他操作;

首先啟動server程序,然後啟動多個client程序向server傳送請求,專案實現之後的效果如下:

3.2 專案實現原理

  1. client如何和server建立連線?

    client程序和server程序都可以存取一段共用記憶體,當server程序啟動時,會對這段共用記憶體進行初始化,初始化完成之後,server對號誌A執行post操作,表明共用記憶體準備完畢,之後server程序就通過號誌B等待新連線的建立;當有新的client程序想建立連線時,會先通過對號誌A執行wait操作,等待共用記憶體可用,如果可用,client會把請求引數寫到共用記憶體之中,寫入完成後會對號誌B執行post操作,通知server程序有新的連線已經建立。

  2. client建立連線之後如何傳送請求?

    client通過兩個訊息佇列實現傳送請求和接收響應。在client建立連線時,會在共用記憶體中寫入用於和server通訊的兩個訊息佇列的名字,server在處理連線時會開啟訊息佇列,然後和client進行通訊。對於每個新建立的連線,server會fork出一個新的程序去處理該連線對應的client傳送來的請求。

  3. client如何通過傳送請求關閉server?

    client通過向請求訊息佇列中寫入kill_server請求,可以實現關閉server。當server程序fork出的程序從訊息佇列中讀到kill_server請求,該程序會通過管道寫入資料,通知server的主程序結束執行。

  4. server和client之間的時序關係:

    通過前面3點的描述可以看出,這個簡單的專案几乎用到了全部常用的IPC方法,下面這個時序圖更直觀地說明了其工作原理:

    server和client之間的同步操作主要集中在步驟6,7,8,9。當server準備好共用記憶體之後,通過第6步的號誌A通知client可以建立連線了,之後client向共用記憶體寫入資料,再操作第9步的號誌B通知server連線資料已經寫入,最後server會建立子程序去處理client的請求。實際上server的主程序是一個迴圈,處理請求都是在server的子程序中完成的,以上內容說明了server主程序在迴圈中完成的工作。

  5. 資源清理

    當我們使用POSIX IPC時,核心會建立相應的資料結構,並且通過檔案系統介面展示給使用者,但IPC資源不能無限建立,當我們的程式執行結束之後應該清理自己用到的IPC資源。執行程式時建立的POSIX IPC物件可以在/dev/shm以及/dev/mqueue下檢視,程式結束之後,server和client會釋放掉自己建立的IPC資源。所以,要檢視server和client建立的共用記憶體,號誌以及訊息佇列,需要在程式執行期間檢視上述的兩個目錄。

3.3 主要程式碼功能

  • 訊息格式:

    server和client之間通過訊息佇列傳遞請求和響應資料,訊息佇列中訊息格式定義如下:

    struct msgbuf {
        int type;
        union {
            struct {
                int a;
                int b;
            } request_add;
    
            struct {
                int c;
            } response_add;
    
            struct {
                int disconect;
            } request_disconnect;
    
            struct {
                int kill_server;
            } request_kill_server;
        } data;
    };
    
  • server主程序:

    int main(int argc, char **argv) {
    
        int err = server_init();
        if (err) {
            log_warning("server_init failed\n");
            return -1;
        }
        server_start();
        server_shutdown();
    
        return 0;
    }
    

    其中,在server_init中,server會建立需要使用的共用記憶體,號誌以及管道。

    int server_init() {
        memset(&ipc_server, 0, sizeof(ipc_server));
    
        // shared memory init
        ipc_server.conn_buf_fd =
            shm_open(CONNECTION_SHM, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR);
    
        ...
    
        if (ftruncate(ipc_server.conn_buf_fd, CONNECTION_SHM_SIZE) < 0) {
            log_warning("server failed ftruncate\n");
            return -1;
        }
    
        ipc_server.conn_buf = (struct connection *)mmap(
            NULL, CONNECTION_SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED,
            ipc_server.conn_buf_fd, 0);
    
        ...
    
        memset(ipc_server.conn_buf, 0, CONNECTION_SHM_SIZE);
    
        ipc_server.conn_buf_ready =
            sem_open(CONNECTION_BUF_SEM, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR, 0);
    
        ...
    
        ipc_server.conn_new_ready =
            sem_open(CONNECTION_NEW_SEM, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR, 0);
        
        ...
    
        // pipe init
        int pipefd[2];
        if (pipe2(ipc_server.pipefd, O_NONBLOCK)) {
            log_warning("server failed pipe2\n");
            return -1;
        }
    
        log_info("server init done\n");
        return 0;
    }
    

    在server_start中會迴圈處理來自client的連線。

    void server_start() {
        int err = sem_post(ipc_server.conn_buf_ready);
    
        ...
    
        struct connection conn;
        int stop = 0;
        while (!stop) {
            // handle new connection
            sem_wait(ipc_server.conn_new_ready);
            if (read(ipc_server.pipefd[0], &stop, sizeof(int)) <= 0)
                stop = 0;
    
            if (ipc_server.conn_buf->valid) {
                log_info("new connection established\n");
                memcpy(&conn, ipc_server.conn_buf, sizeof(conn));
                handle_connection(&conn);
                memset(ipc_server.conn_buf, 0, sizeof(struct connection));
                sem_post(ipc_server.conn_buf_ready);
            }
        }
    }
    

    當server主程序退出之後,server_shutdown會清理IPC資源。

  • client程序:

    client啟動之後,首先會嘗試和server建立連線,建立連線之後會迴圈處理使用者輸入,通過訊息佇列向server的服務程序傳送請求。

    int main(int argc, char **argv) {
    
        if (build_connection()) {
            log_info("client failed build_connection\n");
            return -1;
        }
        handle_command();
        cleanup();
    
        log_info("client %d exit\n", getpid());
        return 0;
    }
    

    client建立連線的過程如下:建立連線時client需要等待共用記憶體可用,並且在寫入連線資料之後通知server,這些同步操作都是通過號誌實現的。

    int build_connection() {
        
        ...
    
        connection.mqreq_fd =
            mq_open(connection.mqreq, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR, &attr);
        connection.mqrsp_fd =
            mq_open(connection.mqrsp, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR, &attr);
    
        ...
    
        // open and map shared memory
        int fd = shm_open(CONNECTION_SHM, O_RDWR, 0);
        void *conn_buf = mmap(NULL, CONNECTION_SHM_SIZE, PROT_READ | PROT_WRITE,
                            MAP_SHARED, fd, 0);
    
        // write connection to conn_buf, notify server new conncection is comming
        sem_t *conn_buf_ready = sem_open(CONNECTION_BUF_SEM, O_RDWR);
        sem_t *conn_new_ready = sem_open(CONNECTION_NEW_SEM, O_RDWR);
    
        ...
    
        sem_wait(conn_buf_ready);
        connection.valid = 1;
        memcpy(conn_buf, &connection, sizeof(connection));
        sem_post(conn_new_ready);
    
        ...
    
        return 0;
    }
    

寫在最後

通過一個包含server和client的程式碼範例,說明了POSIX IPC中共用記憶體,訊息佇列以及號誌的使用方法。具體實現可以參考我的程式碼倉庫