Linux 下的進程間通訊:共用儲存

2019-05-07 12:26:00

學習在 Linux 中進程是如何與其他進程進行同步的。

本篇是 Linux 下進程間通訊(IPC)系列的第一篇文章。這個系列將使用 C 語言程式碼範例來闡明以下 IPC 機制:

  • 共用檔案
  • 共用記憶體(使用號誌)
  • 管道(命名的或非命名的管道)
  • 訊息佇列
  • 通訊端
  • 信號

在聚焦上面提到的共用檔案和共用記憶體這兩個機制之前,這篇文章將帶你回顧一些核心的概念。

核心概念

進程是執行著的程式,每個進程都有著它自己的地址空間,這些空間由進程被允許存取的記憶體地址組成。進程有一個或多個執行執行緒,而執行緒是一系列執行指令的集合:單執行緒進程就只有一個執行緒,而多執行緒的進程則有多個執行緒。一個進程中的執行緒共用各種資源,特別是地址空間。另外,一個進程中的執行緒可以直接通過共用記憶體來進行通訊,儘管某些現代語言(例如 Go)鼓勵一種更有序的方式,例如使用執行緒安全的通道。當然對於不同的進程,預設情況下,它們能共用記憶體。

有多種方法啟動之後要進行通訊的進程,下面所舉的例子中主要使用了下面的兩種方法:

  • 一個終端被用來啟動一個進程,另外一個不同的終端被用來啟動另一個。
  • 在一個進程(父進程)中呼叫系統函數 fork,以此生發另一個進程(子進程)。

第一個例子採用了上面使用終端的方法。這些程式碼範例的 ZIP 壓縮包可以從我的網站下載到。

共用檔案

程式設計師對檔案存取應該都已經很熟識了,包括許多坑(不存在的檔案、檔案許可權損壞等等),這些問題困擾著程式對檔案的使用。儘管如此,共用檔案可能是最為基礎的 IPC 機制了。考慮一下下面這樣一個相對簡單的例子,其中一個進程(生產者 producer)建立和寫入一個檔案,然後另一個進程(消費者 consumer)從這個相同的檔案中進行讀取:

          writes +-----------+ readsproducer-------->| disk file |<-------consumer                 +-----------+

在使用這個 IPC 機制時最明顯的挑戰是競爭條件可能會發生:生產者和消費者可能恰好在同一時間存取該檔案,從而使得輸出結果不確定。為了避免競爭條件的發生,該檔案在處於狀態時必須以某種方式處於被鎖狀態,從而阻止在操作執行時和其他操作的衝突。在標準系統庫中與鎖相關的 API 可以被總結如下:

  • 生產者應該在寫入檔案時獲得一個檔案的排斥鎖。一個排斥鎖最多被一個進程所擁有。這樣就可以排除掉競爭條件的發生,因為在鎖被釋放之前沒有其他的進程可以存取這個檔案。
  • 消費者應該在從檔案中讀取內容時得到至少一個共用鎖。多個讀取者可以同時保有一個共用鎖,但是沒有寫入者可以獲取到檔案內容,甚至在當只有一個讀取者保有一個共用鎖時。

共用鎖可以提升效率。假如一個進程只是讀入一個檔案的內容,而不去改變它的內容,就沒有什麼原因阻止其他進程來做同樣的事。但如果需要寫入內容,則很顯然需要檔案有排斥鎖。

標準的 I/O 庫中包含一個名為 fcntl 的實用函數,它可以被用來檢查或者操作一個檔案上的排斥鎖和共用鎖。該函數通過一個檔案描述符(一個在進程中的非負整數值)來標記一個檔案(在不同的進程中不同的檔案描述符可能標記同一個物理檔案)。對於檔案的鎖定, Linux 提供了名為 flock 的庫函數,它是 fcntl 的一個精簡包裝。第一個例子中使用 fcntl 函數來暴露這些 API 細節。

範例 1. 生產者程式

#include <stdio.h>#include <stdlib.h>#include <fcntl.h>#include <unistd.h>#include <string.h>#define FileName "data.dat"#define DataString "Now is the winter of our discontent\nMade glorious summer by this sun of York\n"void report_and_exit(const char* msg) {  perror(msg);  exit(-1); /* EXIT_FAILURE */}int main() {  struct flock lock;  lock.l_type = F_WRLCK;    /* read/write (exclusive versus shared) lock */  lock.l_whence = SEEK_SET; /* base for seek offsets */  lock.l_start = 0;         /* 1st byte in file */  lock.l_len = 0;           /* 0 here means 'until EOF' */  lock.l_pid = getpid();    /* process id */  int fd; /* file descriptor to identify a file within a process */  if ((fd = open(FileName, O_RDWR | O_CREAT, 0666)) < 0)  /* -1 signals an error */    report_and_exit("open failed...");  if (fcntl(fd, F_SETLK, &lock) < 0) /** F_SETLK doesn't block, F_SETLKW does **/    report_and_exit("fcntl failed to get lock...");  else {    write(fd, DataString, strlen(DataString)); /* populate data file */    fprintf(stderr, "Process %d has written to data file...\n", lock.l_pid);  }  /* Now release the lock explicitly. */  lock.l_type = F_UNLCK;  if (fcntl(fd, F_SETLK, &lock) < 0)    report_and_exit("explicit unlocking failed...");  close(fd); /* close the file: would unlock if needed */  return 0;  /* terminating the process would unlock as well */}

上面生產者程式的主要步驟可以總結如下:

  • 這個程式首先宣告了一個型別為 struct flock 的變數,它代表一個鎖,並對它的 5 個域做了初始化。第一個初始化

    lock.l_type = F_WRLCK; /* exclusive lock */

    使得這個鎖為排斥鎖(read-write)而不是一個共用鎖(read-only)。假如生產者獲得了這個鎖,則其他的進程將不能夠對檔案做讀或者寫操作,直到生產者釋放了這個鎖,或者顯式地呼叫 fcntl,又或者隱式地關閉這個檔案。(當進程終止時,所有被它開啟的檔案都會被自動關閉,從而釋放了鎖)

  • 上面的程式接著初始化其他的域。主要的效果是整個檔案都將被鎖上。但是,有關鎖的 API 允許特別指定的位元組被上鎖。例如,假如檔案包含多個文字記錄,則單個記錄(或者甚至一個記錄的一部分)可以被鎖,而其餘部分不被鎖。

  • 第一次呼叫 fcntl

    if (fcntl(fd, F_SETLK, &lock) < 0)

    嘗試排斥性地將檔案鎖住,並檢查呼叫是否成功。一般來說, fcntl 函數返回 -1 (因此小於 0)意味著失敗。第二個引數 F_SETLK 意味著 fcntl 的呼叫不是堵塞的;函數立即做返回,要麼獲得鎖,要麼顯示失敗了。假如替換地使用 F_SETLKW(末尾的 W 代指等待),那麼對 fcntl 的呼叫將是阻塞的,直到有可能獲得鎖的時候。在呼叫 fcntl 函數時,它的第一個引數 fd 指的是檔案描述符,第二個引數指定了將要採取的動作(在這個例子中,F_SETLK 指代設定鎖),第三個引數為鎖結構的地址(在本例中,指的是 &lock)。

  • 假如生產者獲得了鎖,這個程式將向檔案寫入兩個文字記錄。

  • 在向檔案寫入內容後,生產者改變鎖結構中的 l_type 域為 unlock 值:

    lock.l_type = F_UNLCK;

    並呼叫 fcntl 來執行解鎖操作。最後程式關閉了檔案並退出。

範例 2. 消費者程式

#include <stdio.h>#include <stdlib.h>#include <fcntl.h>#include <unistd.h>#define FileName "data.dat"void report_and_exit(const char* msg) {  perror(msg);  exit(-1); /* EXIT_FAILURE */}int main() {  struct flock lock;  lock.l_type = F_WRLCK;    /* read/write (exclusive) lock */  lock.l_whence = SEEK_SET; /* base for seek offsets */  lock.l_start = 0;         /* 1st byte in file */  lock.l_len = 0;           /* 0 here means 'until EOF' */  lock.l_pid = getpid();    /* process id */  int fd; /* file descriptor to identify a file within a process */  if ((fd = open(FileName, O_RDONLY)) < 0)  /* -1 signals an error */    report_and_exit("open to read failed...");  /* If the file is write-locked, we can't continue. */  fcntl(fd, F_GETLK, &lock); /* sets lock.l_type to F_UNLCK if no write lock */  if (lock.l_type != F_UNLCK)    report_and_exit("file is still write locked...");  lock.l_type = F_RDLCK; /* prevents any writing during the reading */  if (fcntl(fd, F_SETLK, &lock) < 0)    report_and_exit("can't get a read-only lock...");  /* Read the bytes (they happen to be ASCII codes) one at a time. */  int c; /* buffer for read bytes */  while (read(fd, &c, 1) > 0)    /* 0 signals EOF */    write(STDOUT_FILENO, &c, 1); /* write one byte to the standard output */  /* Release the lock explicitly. */  lock.l_type = F_UNLCK;  if (fcntl(fd, F_SETLK, &lock) < 0)    report_and_exit("explicit unlocking failed...");  close(fd);  return 0;}

相比於鎖的 API,消費者程式會相對複雜一點兒。特別的,消費者程式首先檢查檔案是否被排斥性的被鎖,然後才嘗試去獲得一個共用鎖。相關的程式碼為:

lock.l_type = F_WRLCK;...fcntl(fd, F_GETLK, &lock); /* sets lock.l_type to F_UNLCK if no write lock */if (lock.l_type != F_UNLCK)  report_and_exit("file is still write locked...");

fcntl 呼叫中的 F_GETLK 操作指定檢查一個鎖,在本例中,上面程式碼的宣告中給了一個 F_WRLCK 的排斥鎖。假如特指的鎖不存在,那麼 fcntl 呼叫將會自動地改變鎖型別域為 F_UNLCK 以此來顯示當前的狀態。假如檔案是排斥性地被鎖,那麼消費者將會終止。(一個更健壯的程式版本或許應該讓消費者會兒,然後再嘗試幾次。)

假如當前檔案沒有被鎖,那麼消費者將嘗試獲取一個共用(read-only)鎖(F_RDLCK)。為了縮短程式,fcntl 中的 F_GETLK 呼叫可以丟棄,因為假如其他進程已經保有一個讀寫鎖,F_RDLCK 的呼叫就可能會失敗。重新呼叫一個唯讀鎖能夠阻止其他進程向檔案進行寫的操作,但可以允許其他進程對檔案進行讀取。簡而言之,共用鎖可以被多個進程所保有。在獲取了一個共用鎖後,消費者程式將立即從檔案中讀取位元組資料,然後在標準輸出中列印這些位元組的內容,接著釋放鎖,關閉檔案並終止。

下面的 % 為命令列提示符,下面展示的是從相同終端開啟這兩個程式的輸出:

% ./producerProcess 29255 has written to data file...% ./consumerNow is the winter of our discontentMade glorious summer by this sun of York

在本次的程式碼範例中,通過 IPC 傳輸的資料是文字:它們來自莎士比亞的戲劇《理查三世》中的兩行台詞。然而,共用檔案的內容還可以是紛繁複雜的,任意的位元組資料(例如一個電影)都可以,這使得檔案共用變成了一個非常靈活的 IPC 機制。但它的缺點是檔案獲取速度較慢,因為檔案的獲取涉及到讀或者寫。同往常一樣,程式設計總是伴隨著折中。下面的例子將通過共用記憶體來做 IPC,而不是通過共用檔案,在效能上相應的有極大的提升。

共用記憶體

對於共用記憶體,Linux 系統提供了兩類不同的 API:傳統的 System V API 和更新一點的 POSIX API。在單個應用中,這些 API 不能混用。但是,POSIX 方式的一個壞處是它的特性仍在發展中,並且依賴於安裝的核心版本,這非常影響程式碼的可移植性。例如,預設情況下,POSIX API 用記憶體對映檔案來實現共用記憶體:對於一個共用的記憶體段,系統為相應的內容維護一個備份檔案。在 POSIX 規範下共用記憶體可以被設定為不需要備份檔案,但這可能會影響可移植性。我的例子中使用的是帶有備份檔案的 POSIX API,這既結合了記憶體獲取的速度優勢,又獲得了檔案儲存的永續性。

下面的共用記憶體例子中包含兩個程式,分別名為 memwritermemreader,並使用號誌來調整它們對共用記憶體的獲取。在任何時候當共用記憶體進入一個寫入者場景時,無論是多進程還是多執行緒,都有遇到基於記憶體的競爭條件的風險,所以,需要引入號誌來協調(同步)對共用記憶體的獲取。

memwriter 程式應當在它自己所處的終端首先啟動,然後 memreader 程式才可以在它自己所處的終端啟動(在接著的十幾秒內)。memreader 的輸出如下:

This is the way the world ends...

在每個源程式的最上方註釋部分都解釋了在編譯它們時需要新增的連結引數。

首先讓我們複習一下號誌是如何作為一個同步機制工作的。一般的號誌也被叫做一個計數號誌,因為帶有一個可以增加的值(通常初始化為 0)。考慮一家租用自行車的商店,在它的庫存中有 100 輛自行車,還有一個供職員用於租賃的程式。每當一輛自行車被租出去,號誌就增加 1;當一輛自行車被還回來,號誌就減 1。在號誌的值為 100 之前都還可以進行租賃業務,但如果等於 100 時,就必須停止業務,直到至少有一輛自行車被還回來,從而號誌減為 99。

二元號誌是一個特例,它只有兩個值:0 和 1。在這種情況下,號誌的表現為互斥量(一個互斥的構造)。下面的共用記憶體範例將把號誌用作互斥量。當號誌的值為 0 時,只有 memwriter 可以獲取共用記憶體,在寫操作完成後,這個進程將增加號誌的值,從而允許 memreader 來讀取共用記憶體。

範例 3. memwriter 進程的源程式

/** Compilation: gcc -o memwriter memwriter.c -lrt -lpthread **/#include <stdio.h>#include <stdlib.h>#include <sys/mman.h>#include <sys/stat.h>#include <fcntl.h>#include <unistd.h>#include <semaphore.h>#include <string.h>#include "shmem.h"void report_and_exit(const char* msg) {  perror(msg);  exit(-1);}int main() {  int fd = shm_open(BackingFile,      /* name from smem.h */                    O_RDWR | O_CREAT, /* read/write, create if needed */                    AccessPerms);     /* access permissions (0644) */  if (fd < 0) report_and_exit("Can't open shared mem segment...");  ftruncate(fd, ByteSize); /* get the bytes */  caddr_t memptr = mmap(NULL,       /* let system pick where to put segment */                        ByteSize,   /* how many bytes */                        PROT_READ | PROT_WRITE, /* access protections */                        MAP_SHARED, /* mapping visible to other processes */                        fd,         /* file descriptor */                        0);         /* offset: start at 1st byte */  if ((caddr_t) -1  == memptr) report_and_exit("Can't get segment...");  fprintf(stderr, "shared mem address: %p [0..%d]\n", memptr, ByteSize - 1);  fprintf(stderr, "backing file:       /dev/shm%s\n", BackingFile );  /* semaphore code to lock the shared mem */  sem_t* semptr = sem_open(SemaphoreName, /* name */                           O_CREAT,       /* create the semaphore */                           AccessPerms,   /* protection perms */                           0);            /* initial value */  if (semptr == (void*) -1) report_and_exit("sem_open");  strcpy(memptr, MemContents); /* copy some ASCII bytes to the segment */  /* increment the semaphore so that memreader can read */  if (sem_post(semptr) < 0) report_and_exit("sem_post");  sleep(12); /* give reader a chance */  /* clean up */  munmap(memptr, ByteSize); /* unmap the storage */  close(fd);  sem_close(semptr);  shm_unlink(BackingFile); /* unlink from the backing file */  return 0;}

下面是 memwritermemreader 程式如何通過共用記憶體來通訊的一個總結:

  • 上面展示的 memwriter 程式呼叫 shm_open 函數來得到作為系統協調共用記憶體的備份檔案的檔案描述符。此時,並沒有記憶體被分配。接下來呼叫的是令人誤解的名為 ftruncate 的函數

    ftruncate(fd, ByteSize); /* get the bytes */

    它將分配 ByteSize 位元組的記憶體,在該情況下,一般為大小適中的 512 位元組。memwritermemreader 程式都只從共用記憶體中獲取資料,而不是從備份檔案。系統將負責共用記憶體和備份檔案之間資料的同步。

  • 接著 memwriter 呼叫 mmap 函數:

    caddr_t memptr = mmap(NULL, /* let system pick where to put segment */                  ByteSize, /* how many bytes */                  PROT_READ | PROT_WRITE, /* access protections */                  MAP_SHARED, /* mapping visible to other processes */                  fd, /* file descriptor */                  0); /* offset: start at 1st byte */

    來獲得共用記憶體的指標。(memreader 也做一次類似的呼叫。) 指標型別 caddr_tc 開頭,它代表 calloc,而這是動態初始化分配的記憶體為 0 的一個系統函數。memwriter 通過庫函數 strcpy(字串複製)來獲取後續操作的 memptr

  • 到現在為止,memwriter 已經準備好進行寫操作了,但首先它要建立一個號誌來確保共用記憶體的排斥性。假如 memwriter 正在執行寫操作而同時 memreader 在執行讀操作,則有可能出現競爭條件。假如呼叫 sem_open 成功了:

    sem_t* semptr = sem_open(SemaphoreName, /* name */                     O_CREAT, /* create the semaphore */                     AccessPerms, /* protection perms */                     0); /* initial value */

    那麼,接著寫操作便可以執行。上面的 SemaphoreName(任意一個唯一的非空名稱)用來在 memwritermemreader 識別號誌。初始值 0 將會傳遞給號誌的建立者,在這個例子中指的是 memwriter 賦予它執行操作的權利。

  • 在寫操作完成後,memwriter* 通過呼叫sem_post` 函數將號誌的值增加到 1:

    if (sem_post(semptr) < 0) ..

    增加信號了將釋放互斥鎖,使得 memreader 可以執行它的操作。為了更好地測量,memwriter 也將從它自己的地址空間中取消對映,

    munmap(memptr, ByteSize); /* unmap the storage *

    這將使得 memwriter 不能進一步地存取共用記憶體。

範例 4. memreader 進程的原始碼

/** Compilation: gcc -o memreader memreader.c -lrt -lpthread **/#include <stdio.h>#include <stdlib.h>#include <sys/mman.h>#include <sys/stat.h>#include <fcntl.h>#include <unistd.h>#include <semaphore.h>#include <string.h>#include "shmem.h"void report_and_exit(const char* msg) {  perror(msg);  exit(-1);}int main() {  int fd = shm_open(BackingFile, O_RDWR, AccessPerms);  /* empty to begin */  if (fd < 0) report_and_exit("Can't get file descriptor...");  /* get a pointer to memory */  caddr_t memptr = mmap(NULL,       /* let system pick where to put segment */                        ByteSize,   /* how many bytes */                        PROT_READ | PROT_WRITE, /* access protections */                        MAP_SHARED, /* mapping visible to other processes */                        fd,         /* file descriptor */                        0);         /* offset: start at 1st byte */  if ((caddr_t) -1 == memptr) report_and_exit("Can't access segment...");  /* create a semaphore for mutual exclusion */  sem_t* semptr = sem_open(SemaphoreName, /* name */                           O_CREAT,       /* create the semaphore */                           AccessPerms,   /* protection perms */                           0);            /* initial value */  if (semptr == (void*) -1) report_and_exit("sem_open");  /* use semaphore as a mutex (lock) by waiting for writer to increment it */  if (!sem_wait(semptr)) { /* wait until semaphore != 0 */    int i;    for (i = 0; i < strlen(MemContents); i++)      write(STDOUT_FILENO, memptr + i, 1); /* one byte at a time */    sem_post(semptr);  }  /* cleanup */  munmap(memptr, ByteSize);  close(fd);  sem_close(semptr);  unlink(BackingFile);  return 0;}

memwritermemreader 程式中,共用記憶體的主要著重點都在 shm_openmmap 函數上:在成功時,第一個呼叫返回一個備份檔案的檔案描述符,而第二個呼叫則使用這個檔案描述符從共用記憶體段中獲取一個指標。它們對 shm_open 的呼叫都很相似,除了 memwriter 程式建立共用記憶體,而 `memreader 只獲取這個已經建立的記憶體:

int fd = shm_open(BackingFile, O_RDWR | O_CREAT, AccessPerms); /* memwriter */int fd = shm_open(BackingFile, O_RDWR, AccessPerms); /* memreader */

有了檔案描述符,接著對 mmap 的呼叫就是類似的了:

caddr_t memptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);

mmap 的第一個引數為 NULL,這意味著讓系統自己決定在虛擬記憶體地址的哪個地方分配記憶體,當然也可以指定一個地址(但很有技巧性)。MAP_SHARED 標誌著被分配的記憶體在進程中是共用的,最後一個引數(在這個例子中為 0 ) 意味著共用記憶體的偏移量應該為第一個位元組。size 引數特別指定了將要分配的位元組數目(在這個例子中是 512);另外的保護引數(AccessPerms)暗示著共用記憶體是可讀可寫的。

memwriter 程式執行成功後,系統將建立並維護備份檔案,在我的系統中,該檔案為 /dev/shm/shMemEx,其中的 shMemEx 是我為共用儲存命名的(在標頭檔案 shmem.h 中給定)。在當前版本的 memwritermemreader 程式中,下面的語句

shm_unlink(BackingFile); /* removes backing file */

將會移除備份檔案。假如沒有 unlink 這個語句,則備份檔案在程式終止後仍然持久地儲存著。

memreadermemwriter 一樣,在呼叫 sem_open 函數時,通過號誌的名字來獲取號誌。但 memreader 隨後將進入等待狀態,直到 memwriter 將初始值為 0 的號誌的值增加。

if (!sem_wait(semptr)) { /* wait until semaphore != 0 */

一旦等待結束,memreader 將從共用記憶體中讀取 ASCII 資料,然後做些清理工作並終止。

共用記憶體 API 包括顯式地同步共用記憶體段和備份檔案。在這次的範例中,這些操作都被省略了,以免文章顯得雜亂,好讓我們專注於記憶體共用和號誌的程式碼。

即便在號誌程式碼被移除的情況下,memwritermemreader 程式很大幾率也能夠正常執行而不會引入競爭條件:memwriter 建立了共用記憶體段,然後立即向它寫入;memreader 不能存取共用記憶體,直到共用記憶體段被建立好。然而,當一個寫操作處於混合狀態時,最佳實踐需要共用記憶體被同步。號誌 API 足夠重要,值得在程式碼範例中著重強調。

總結

上面共用檔案和共用記憶體的例子展示了進程是怎樣通過共用儲存來進行通訊的,前者通過檔案而後者通過記憶體塊。這兩種方法的 API 相對來說都很直接。這兩種方法有什麼共同的缺點嗎?現代的應用經常需要處理流資料,而且是非常大規模的資料流。共用檔案或者共用記憶體的方法都不能很好地處理大規模的流資料。按照型別使用管道會更加合適一些。所以這個系列的第二部分將會介紹管道和訊息佇列,同樣的,我們將使用 C 語言寫的程式碼範例來輔助講解。