Linux 下的進程間通訊:使用管道和訊息佇列

2019-05-12 13:43:00

學習在 Linux 中進程是如何與其他進程進行同步的。

本篇是 Linux 下進程間通訊(IPC)系列的第二篇文章。 聚焦於通過共用檔案和共用記憶體段這樣的共用儲存來進行 IPC。這篇檔案的重點將轉向管道,它是連線需要通訊的進程之間的通道。管道擁有一個寫端用於寫入位元組資料,還有一個讀端用於按照先入先出的順序讀入這些位元組資料。而這些位元組資料可能代表任何東西:數位、員工記錄、數位電影等等。

管道有兩種型別,命名管道和無名管道,都可以互動式的在命令列或程式中使用它們;相關的例子在下面展示。這篇文章也將介紹記憶體佇列,儘管它們有些過時了,但它們不應該受這樣的待遇。

在本系列的第一篇文章中的範例程式碼承認了在 IPC 中可能受到競爭條件(不管是基於檔案的還是基於記憶體的)的威脅。自然地我們也會考慮基於管道的 IPC 的安全並行問題,這個也將在本文中提及。針對管道和記憶體佇列的例子將會使用 POSIX 推薦使用的 API,POSIX 的一個核心目標就是執行緒安全。

請檢視一些 mq_open 函數的 man 頁,這個函數屬於記憶體佇列的 API。這個 man 頁中有關 特性 的章節帶有一個小表格:

介面特性
mq_open()執行緒安全MT-Safe

上面的 MT-Safe(MT 指的是多執行緒multi-threaded)意味著 mq_open 函數是執行緒安全的,進而暗示是進程安全的:一個進程的執行和它的一個執行緒執行的過程類似,假如競爭條件不會發生在處於相同進程的執行緒中,那麼這樣的條件也不會發生在處於不同進程的執行緒中。MT-Safe 特性保證了呼叫 mq_open 時不會出現競爭條件。一般來說,基於通道的 IPC 是並行安全的,儘管在下面例子中會出現一個有關警告的注意事項。

無名管道

首先讓我們通過一個特意構造的命令列例子來展示無名管道是如何工作的。在所有的現代系統中,符號 | 在命令列中都代表一個無名管道。假設我們的命令列提示符為 %,接下來考慮下面的命令:

## 寫入方在 | 左邊,讀取方在右邊% sleep 5 | echo "Hello, world!" 

sleepecho 程式以不同的進程執行,無名管道允許它們進行通訊。但是上面的例子被特意設計為沒有通訊發生。問候語 “Hello, world!” 出現在螢幕中,然後過了 5 秒後,命令列返回,暗示 sleepecho 進程都已經結束了。這期間發生了什麼呢?

在命令列中的豎線 | 的語法中,左邊的進程(sleep)是寫入方,右邊的進程(echo)為讀取方。預設情況下,讀取方將會阻塞,直到從通道中能夠讀取到位元組資料,而寫入方在寫完它的位元組資料後,將傳送 流已終止end-of-stream的標誌。(即便寫入方過早終止了,一個流已終止的標誌還是會發給讀取方。)無名管道將保持到寫入方和讀取方都停止的那個時刻。

在上面的例子中,sleep 進程並沒有向通道寫入任何的位元組資料,但在 5 秒後就終止了,這時將向通道傳送一個流已終止的標誌。與此同時,echo 進程立即向標準輸出(螢幕)寫入問候語,因為這個進程並不從通道中讀入任何位元組,所以它並沒有等待。一旦 sleepecho 進程都終止了,不會再用作通訊的無名管道將會消失然後返回命令列提示符。

下面這個更加實用的範例將使用兩個無名管道。我們假定檔案 test.dat 的內容如下:

thisisthewaytheworldends

下面的命令:

% cat test.dat | sort | uniq

會將 cat連線concatenate的縮寫)進程的輸出通過管道傳給 sort 進程以生成排序後的輸出,然後將排序後的輸出通過管道傳給 uniq 進程以消除重複的記錄(在本例中,會將兩次出現的 “the” 縮減為一個):

endsisthethiswayworld

下面展示的情景展示的是一個帶有兩個進程的程式通過一個無名管道通訊來進行通訊。

範例 1. 兩個進程通過一個無名管道來進行通訊

#include <sys/wait.h> /* wait */#include <stdio.h>#include <stdlib.h>   /* exit functions */#include <unistd.h>   /* read, write, pipe, _exit */#include <string.h>#define ReadEnd  0#define WriteEnd 1void report_and_exit(const char* msg) {  perror(msg);  exit(-1);    /** failure **/}int main() {  int pipeFDs[2]; /* two file descriptors */  char buf;       /* 1-byte buffer */  const char* msg = "Nature's first green is gold\n"; /* bytes to write */  if (pipe(pipeFDs) < 0) report_and_exit("pipeFD");  pid_t cpid = fork();                                /* fork a child process */  if (cpid < 0) report_and_exit("fork");              /* check for failure */  if (0 == cpid) {    /*** child ***/                 /* child process */    close(pipeFDs[WriteEnd]);                         /* child reads, doesn't write */    while (read(pipeFDs[ReadEnd], &buf, 1) > 0)       /* read until end of byte stream */      write(STDOUT_FILENO, &buf, sizeof(buf));        /* echo to the standard output */    close(pipeFDs[ReadEnd]);                          /* close the ReadEnd: all done */    _exit(0);                                         /* exit and notify parent at once  */  }  else {              /*** parent ***/    close(pipeFDs[ReadEnd]);                          /* parent writes, doesn't read */    write(pipeFDs[WriteEnd], msg, strlen(msg));       /* write the bytes to the pipe */    close(pipeFDs[WriteEnd]);                         /* done writing: generate eof */    wait(NULL);                                       /* wait for child to exit */    exit(0);                                          /* exit normally */  }  return 0;}

上面名為 pipeUN 的程式使用系統函數 fork 來建立一個進程。儘管這個程式只有一個單一的原始檔,在它正確執行的情況下將會發生多進程的情況。

下面的內容是對庫函數 fork 如何工作的一個簡要回顧:

  • fork 函數由進程呼叫,在失敗時返回 -1 給父進程。在 pipeUN 這個例子中,相應的呼叫是:

    pid_t cpid = fork(); /* called in parent */

    函數呼叫後的返回值也被儲存下來了。在這個例子中,儲存在整數型別 pid_t 的變數 cpid 中。(每個進程有它自己的進程 ID,這是一個非負的整數,用來標記進程)。復刻一個新的進程可能會因為多種原因而失敗,包括進程表滿了的原因,這個結構由系統維持,以此來追蹤進程狀態。明確地說,殭屍進程假如沒有被處理掉,將可能引起進程表被填滿的錯誤。

  • 假如 fork 呼叫成功,則它將建立一個新的子進程,向父進程返回一個值,向子進程返回另外的一個值。在呼叫 fork 後父進程和子進程都將執行相同的程式碼。(子進程繼承了到此為止父進程中宣告的所有變數的拷貝),特別地,一次成功的 fork 呼叫將返回如下的東西:

    • 向子進程返回 0
    • 向父進程返回子進程的進程 ID
  • 在一次成功的 fork 呼叫後,一個 if/else 或等價的結構將會被用來隔離針對父進程和子進程的程式碼。在這個例子中,相應的宣告為:

    if (0 == cpid) { /*** child ***/...}else { /*** parent ***/...} 

假如成功地復刻出了一個子進程,pipeUN 程式將像下面這樣去執行。在一個整數的數列裡:

int pipeFDs[2]; /* two file descriptors */

來儲存兩個檔案描述符,一個用來向管道中寫入,另一個從管道中寫入。(陣列元素 pipeFDs[0] 是讀端的檔案描述符,元素 pipeFDs[1] 是寫端的檔案描述符。)在呼叫 fork 之前,對系統 pipe 函數的成功呼叫,將立刻使得這個陣列獲得兩個檔案描述符:

if (pipe(pipeFDs) < 0) report_and_exit("pipeFD");

父進程和子進程現在都有了檔案描述符的副本。但分離關注點模式意味著每個進程恰好只需要一個描述符。在這個例子中,父進程負責寫入,而子進程負責讀取,儘管這樣的角色分配可以反過來。在 if 子句中的第一個語句將用於關閉管道的讀端:

close(pipeFDs[WriteEnd]); /* called in child code */

在父進程中的 else 子句將會關閉管道的讀端:

close(pipeFDs[ReadEnd]); /* called in parent code */

然後父進程將向無名管道中寫入某些位元組資料(ASCII 程式碼),子進程讀取這些資料,然後向標準輸出中回放它們。

在這個程式中還需要澄清的一點是在父進程程式碼中的 wait 函數。一旦被建立後,子進程很大程度上獨立於它的父進程,正如簡短的 pipeUN 程式所展示的那樣。子進程可以執行任意的程式碼,而它們可能與父進程完全沒有關係。但是,假如當子進程終止時,系統將會通過一個信號來通知父進程。

要是父進程在子進程之前終止又該如何呢?在這種情形下,除非採取了預防措施,子進程將會變成在進程表中的一個殭屍進程。預防措施有兩大型別:第一種是讓父進程去通知系統,告訴系統它對子進程的終止沒有任何興趣:

signal(SIGCHLD, SIG_IGN); /* in parent: ignore notification */

第二種方法是在子進程終止時,讓父進程執行一個 wait。這樣就確保了父進程可以獨立於子進程而存在。在 pipeUN 程式中使用了第二種方法,其中父進程的程式碼使用的是下面的呼叫:

wait(NULL); /* called in parent */

這個對 wait 的呼叫意味著一直等待直到任意一個子進程的終止發生,因此在 pipeUN 程式中,只有一個子進程。(其中的 NULL 引數可以被替換為一個儲存有子程式退出狀態的整數變數的地址。)對於更細粒度的控制,還可以使用更靈活的 waitpid 函數,例如特別指定多個子進程中的某一個。

pipeUN 將會採取另一個預防措施。當父進程結束了等待,父進程將會呼叫常規的 exit 函數去退出。對應的,子進程將會呼叫 _exit 變種來退出,這類變種將快速跟蹤終止相關的通知。在效果上,子進程會告訴系統立刻去通知父進程它的這個子進程已經終止了。

假如兩個進程向相同的無名管道中寫入內容,位元組資料會交錯嗎?例如,假如進程 P1 向管道寫入內容:

foo bar

同時進程 P2 並行地寫入:

baz baz

到相同的管道,最後的結果似乎是管道中的內容將會是任意錯亂的,例如像這樣:

baz foo baz bar

只要沒有寫入超過 PIPE_BUF 位元組,POSIX 標準就能確保寫入不會交錯。在 Linux 系統中, PIPE_BUF 的大小是 4096 位元組。對於管道我更喜歡只有一個寫入方和一個讀取方,從而繞過這個問題。

命名管道

無名管道沒有備份檔案:系統將維持一個記憶體快取來將位元組資料從寫方傳給讀方。一旦寫方和讀方終止,這個快取將會被回收,進而無名管道消失。相反的,命名管道有備份檔案和一個不同的 API。

下面讓我們通過另一個命令列範例來了解命名管道的要點。下面是具體的步驟:

  • 開啟兩個終端。這兩個終端的工作目錄應該相同。
  • 在其中一個終端中,鍵入下面的兩個命令(命令列提示符仍然是 %,我的注釋以 ## 打頭。):

    % mkfifo tester ## 建立一個備份檔案,名為 tester% cat tester    ## 將管道的內容輸出到 stdout 

    在最開始,沒有任何東西會出現在終端中,因為到現在為止沒有在命名管道中寫入任何東西。

  • 在第二個終端中輸入下面的命令:

    % cat > tester ## redirect keyboard input to the pipehello, world!  ## then hit Return keybye, bye       ## ditto<Control-C>    ## terminate session with a Control-C

    無論在這個終端中輸入什麼,它都會在另一個終端中顯示出來。一旦鍵入 Ctrl+C,就會回到正常的命令列提示符,因為管道已經被關閉了。

  • 通過移除實現命名管道的檔案來進行清理:

    % unlink tester

正如 mkfifo 程式的名字所暗示的那樣,命名管道也被叫做 FIFO,因為第一個進入的位元組,就會第一個出,其他的類似。有一個名為 mkfifo 的庫函數,用它可以在程式中建立一個命名管道,它將在下一個範例中被用到,該範例由兩個行程群組成:一個向命名管道寫入,而另一個從該管道讀取。

範例 2. fifoWriter 程式

#include <sys/types.h>#include <sys/stat.h>#include <fcntl.h>#include <unistd.h>#include <time.h>#include <stdlib.h>#include <stdio.h>#define MaxLoops         12000   /* outer loop */#define ChunkSize           16   /* how many written at a time */#define IntsPerChunk         4   /* four 4-byte ints per chunk */#define MaxZs              250   /* max microseconds to sleep */int main() {  const char* pipeName = "./fifoChannel";  mkfifo(pipeName, 0666);                      /* read/write for user/group/others */  int fd = open(pipeName, O_CREAT | O_WRONLY); /* open as write-only */  if (fd < 0) return -1;                       /* can't go on */  int i;  for (i = 0; i < MaxLoops; i++) {          /* write MaxWrites times */    int j;    for (j = 0; j < ChunkSize; j++) {       /* each time, write ChunkSize bytes */      int k;      int chunk[IntsPerChunk];      for (k = 0; k < IntsPerChunk; k++)        chunk[k] = rand();      write(fd, chunk, sizeof(chunk));    }    usleep((rand() % MaxZs) + 1);           /* pause a bit for realism */  }  close(fd);           /* close pipe: generates an end-of-stream marker */  unlink(pipeName);    /* unlink from the implementing file */  printf("%i ints sent to the pipe.\n", MaxLoops * ChunkSize * IntsPerChunk);  return 0;}

上面的 fifoWriter 程式可以被總結為如下:

  • 首先程式建立了一個命名管道用來寫入資料:

    mkfifo(pipeName, 0666); /* read/write perms for user/group/others */int fd = open(pipeName, O_CREAT | O_WRONLY);

    其中的 pipeName 是備份檔案的名字,傳遞給 mkfifo 作為它的第一個引數。接著命名管道通過我們熟悉的 open 函數呼叫被開啟,而這個函數將會返回一個檔案描述符。

  • 在實現層面上,fifoWriter 不會一次性將所有的資料都寫入,而是寫入一個塊,然後休息亂數目的微秒時間,接著再迴圈往復。總的來說,有 768000 個 4 位元組整數值被寫入到命名管道中。

  • 在關閉命名管道後,fifoWriter 也將使用 unlink 取消對該檔案的連線。

    close(fd); /* close pipe: generates end-of-stream marker */unlink(pipeName); /* unlink from the implementing file */

    一旦連線到管道的每個進程都執行了 unlink 操作後,系統將回收這些備份檔案。在這個例子中,只有兩個這樣的進程 fifoWriterfifoReader,它們都做了 unlink 操作。

這個兩個程式應該在不同終端的相同工作目錄中執行。但是 fifoWriter 應該在 fifoReader 之前被啟動,因為需要 fifoWriter 去建立管道。然後 fifoReader 才能夠獲取到剛被建立的命名管道。

範例 3. fifoReader 程式

#include <stdio.h>#include <stdlib.h>#include <string.h>#include <fcntl.h>#include <unistd.h>unsigned is_prime(unsigned n) { /* not pretty, but efficient */  if (n <= 3) return n > 1;  if (0 == (n % 2) || 0 == (n % 3)) return 0;  unsigned i;  for (i = 5; (i * i) <= n; i += 6)    if (0 == (n % i) || 0 == (n % (i + 2))) return 0;  return 1; /* found a prime! */}int main() {  const char* file = "./fifoChannel";  int fd = open(file, O_RDONLY);  if (fd < 0) return -1; /* no point in continuing */  unsigned count = 0, total = 0, primes_count = 0;  while (1) {    int next;    int i;    ssize_t count = read(fd, &next, sizeof(int));    if (0 == count) break;                  /* end of stream */    else if (count == sizeof(int)) {        /* read a 4-byte int value */      total++;      if (is_prime(next)) primes_count++;    }  }  close(fd);       /* close pipe from read end */  unlink(file);    /* unlink from the underlying file */  printf("Received ints: %u, primes: %u\n", total, primes_count);  return 0;}

上面的 fifoReader 的內容可以總結為如下:

  • 因為 fifoWriter 已經建立了命名管道,所以 fifoReader 只需要利用標準的 open 呼叫來通過備份檔案來獲取到管道中的內容:

    const char* file = "./fifoChannel";int fd = open(file, O_RDONLY);

    這個檔案的是以唯讀開啟的。

  • 然後這個程式進入一個潛在的無限迴圈,在每次回圈時,嘗試讀取 4 位元組的塊。read 呼叫:

    ssize_t count = read(fd, &next, sizeof(int));

    返回 0 來暗示該流的結束。在這種情況下,fifoReader 跳出迴圈,關閉命名管道,並在終止前 unlink 備份檔案。

  • 在讀入 4 位元組整數後,fifoReader 檢查這個數是否為質數。這個操作代表了一個生產級別的讀取器可能在接收到的位元組資料上執行的邏輯操作。在範例執行中,在接收到的 768000 個整數中有 37682 個質數。

重複執行範例, fifoReader 將成功地讀取 fifoWriter 寫入的所有位元組。這不是很讓人驚訝的。這兩個進程在相同的機器上執行,從而可以不用考慮網路相關的問題。命名管道是一個可信且高效的 IPC 機制,因而被廣泛使用。

下面是這兩個程式的輸出,它們在不同的終端中啟動,但處於相同的工作目錄:

% ./fifoWriter768000 ints sent to the pipe.###% ./fifoReaderReceived ints: 768000, primes: 37682

訊息佇列

管道有著嚴格的先入先出行為:第一個被寫入的位元組將會第一個被讀,第二個寫入的位元組將第二個被讀,以此類推。訊息佇列可以做出相同的表現,但它又足夠靈活,可以使得位元組塊可以不以先入先出的次序來接收。

正如它的名字所提示的那樣,訊息佇列是一系列的訊息,每個訊息包含兩部分:

  • 荷載,一個位元組序列(在 C 中是 char)
  • 型別,以一個正整數值的形式給定,型別用來分類訊息,為了更靈活的回收

看一下下面對一個訊息佇列的描述,每個訊息由一個整數型別標記:

          +-+    +-+    +-+    +-+sender--->|3|--->|2|--->|2|--->|1|--->receiver          +-+    +-+    +-+    +-+

在上面展示的 4 個訊息中,標記為 1 的是開頭,即最接近接收端,然後另個標記為 2 的訊息,最後接著一個標記為 3 的訊息。假如按照嚴格的 FIFO 行為執行,訊息將會以 1-2-2-3 這樣的次序被接收。但是訊息佇列允許其他收取次序。例如,訊息可以被接收方以 3-2-1-2 的次序接收。

mqueue 範例包含兩個程式,sender 將向訊息佇列中寫入資料,而 receiver 將從這個佇列中讀取資料。這兩個程式都包含的標頭檔案 queue.h 如下所示:

範例 4. 標頭檔案 queue.h

#define ProjectId 123#define PathName  "queue.h" /* any existing, accessible file would do */#define MsgLen    4#define MsgCount  6typedef struct {   long type;                 /* must be of type long */   char payload[MsgLen + 1];  /* bytes in the message */  } queuedMessage;

上面的標頭檔案定義了一個名為 queuedMessage 的結構型別,它帶有 payload(位元組陣列)和 type(整數)這兩個域。該檔案也定義了一些符號常數(使用 #define 語句),前兩個常數被用來生成一個 key,而這個 key 反過來被用來獲取一個訊息佇列的 ID。ProjectId 可以是任何正整數值,而 PathName 必須是一個存在的、可存取的檔案,在這個範例中,指的是檔案 queue.h。在 senderreceiver 中,它們都有的設定語句為:

key_t key = ftok(PathName, ProjectId); /* generate key */int qid = msgget(key, 0666 | IPC_CREAT); /* use key to get queue id */

ID qid 在效果上是訊息佇列檔案描述符的對應物。

範例 5. sender 程式

#include <stdio.h>#include <sys/ipc.h>#include <sys/msg.h>#include <stdlib.h>#include <string.h>#include "queue.h"void report_and_exit(const char* msg) {  perror(msg);  exit(-1); /* EXIT_FAILURE */}int main() {  key_t key = ftok(PathName, ProjectId);  if (key < 0) report_and_exit("couldn't get key...");  int qid = msgget(key, 0666 | IPC_CREAT);  if (qid < 0) report_and_exit("couldn't get queue id...");  char* payloads[] = {"msg1", "msg2", "msg3", "msg4", "msg5", "msg6"};  int types[] = {1, 1, 2, 2, 3, 3}; /* each must be > 0 */  int i;  for (i = 0; i < MsgCount; i++) {    /* build the message */    queuedMessage msg;    msg.type = types[i];    strcpy(msg.payload, payloads[i]);    /* send the message */    msgsnd(qid, &msg, sizeof(msg), IPC_NOWAIT); /* don't block */    printf("%s sent as type %i\n", msg.payload, (int) msg.type);  }  return 0;}

上面的 sender 程式將傳送出 6 個訊息,每兩個為一個型別:前兩個是型別 1,接著的連個是型別 2,最後的兩個為型別 3。傳送的語句:

msgsnd(qid, &msg, sizeof(msg), IPC_NOWAIT);

被設定為非阻塞的(IPC_NOWAIT 標誌),是因為這裡的訊息體量上都很小。唯一的危險在於一個完整的序列將可能導致傳送失敗,而這個例子不會。下面的 receiver 程式也將使用 IPC_NOWAIT 標誌來接收訊息。

範例 6. receiver 程式

#include <stdio.h>#include <sys/ipc.h>#include <sys/msg.h>#include <stdlib.h>#include "queue.h"void report_and_exit(const char* msg) {  perror(msg);  exit(-1); /* EXIT_FAILURE */}int main() {  key_t key= ftok(PathName, ProjectId); /* key to identify the queue */  if (key < 0) report_and_exit("key not gotten...");  int qid = msgget(key, 0666 | IPC_CREAT); /* access if created already */  if (qid < 0) report_and_exit("no access to queue...");  int types[] = {3, 1, 2, 1, 3, 2}; /* different than in sender */  int i;  for (i = 0; i < MsgCount; i++) {    queuedMessage msg; /* defined in queue.h */    if (msgrcv(qid, &msg, sizeof(msg), types[i], MSG_NOERROR | IPC_NOWAIT) < 0)      puts("msgrcv trouble...");    printf("%s received as type %i\n", msg.payload, (int) msg.type);  }  /** remove the queue **/  if (msgctl(qid, IPC_RMID, NULL) < 0)  /* NULL = 'no flags' */    report_and_exit("trouble removing queue...");  return 0;}

這個 receiver 程式不會建立訊息佇列,儘管 API 儘管建議那樣。在 receiver 中,對

int qid = msgget(key, 0666 | IPC_CREAT);

的呼叫可能因為帶有 IPC_CREAT 標誌而具有誤導性,但是這個標誌的真實意義是如果需要就建立,否則直接獲取sender 程式呼叫 msgsnd 來傳送訊息,而 receiver 呼叫 msgrcv 來接收它們。在這個例子中,sender 以 1-1-2-2-3-3 的次序傳送訊息,但 receiver 接收它們的次序為 3-1-2-1-3-2,這顯示訊息佇列沒有被嚴格的 FIFO 行為所拘泥:

% ./sendermsg1 sent as type 1msg2 sent as type 1msg3 sent as type 2msg4 sent as type 2msg5 sent as type 3msg6 sent as type 3% ./receivermsg5 received as type 3msg1 received as type 1msg3 received as type 2msg2 received as type 1msg6 received as type 3msg4 received as type 2

上面的輸出顯示 senderreceiver 可以在同一個終端中啟動。輸出也顯示訊息佇列是持久的,即便 sender 進程在完成建立佇列、向佇列寫資料、然後退出的整個過程後,該佇列仍然存在。只有在 receiver 進程顯式地呼叫 msgctl 來移除該佇列,這個佇列才會消失:

if (msgctl(qid, IPC_RMID, NULL) < 0) /* remove queue */

總結

管道和訊息佇列的 API 在根本上來說都是單向的:一個進程寫,然後另一個進程讀。當然還存在雙向命名管道的實現,但我認為這個 IPC 機制在它最為簡單的時候反而是最佳的。正如前面提到的那樣,訊息佇列已經不大受歡迎了,儘管沒有找到什麼特別好的原因來解釋這個現象;而佇列仍然是 IPC 工具箱中的一個工具。這個快速的 IPC 工具箱之旅將以第 3 部分(通過通訊端和信號來範例 IPC)來終結。