訊息佇列


為什麼已經擁有了共用記憶體時需要訊息佇列呢? 這將是多種原因,讓我們將其分解為多個點來簡化 -

  • 據了解,一旦訊息被一個進程接收到,它將不再可用於任何其他進程。 而在共用記憶體中,資料可供多個進程存取。
  • 如果想使用小資訊格式進行通訊。
  • 當多個進程同時進行通訊時,共用記憶體資料需要同步保護。
  • 使用共用記憶體的寫入和讀取頻率很高,那麼實現功能將會非常複雜。 在這種情況下不值得使用。
  • 如果所有的進程不需要存取共用記憶體,但是很少的進程只需要它,那麼用訊息佇列實現會更好。
  • 如果想要與不同的資料包進行通訊,比如進程A正在傳送訊息型別1給進程B,訊息型別10給進程C,訊息型別20給進程D。在這種情況下,用訊息佇列實現是比較簡單的。 為了將給定的訊息型別簡化為1,10,20,它可以是0+ve-ve,如下所述。
  • 當然,訊息佇列的順序是FIFO(先進先出)。 插入到佇列中的第一條訊息是第一條要檢索的訊息。

使用共用記憶體或訊息佇列取決於應用程式的需要以及如何有效地使用它。

使用訊息佇列的通訊可以通過以下方式進行 -

  • 通過一個進程寫入共用記憶體,並由另一個進程讀取共用記憶體。 正如我們所知道的,讀取也可以用多個進程來完成。
  • 通過一個進程用不同的資料包寫入共用儲存器,並通過多個進程,即按照訊息型別讀出。

    看到訊息佇列中的某些資訊後,現在是檢查支援訊息佇列的系統呼叫(System V)的時候了。

要使用訊息佇列執行通訊,請執行以下步驟 -

第1步 - 建立一個訊息佇列或連線到一個已經存在的訊息佇列(msgget())
第2步 - 寫入訊息佇列(msgsnd())
第3步 - 從訊息佇列中讀取(msgrcv())
第4步 - 對訊息佇列(msgctl())執行控制操作

現在,讓我們看看上述呼叫的語法和某些資訊。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgget(key_t key, int msgflg)

這個系統呼叫建立或分配一個System V訊息佇列。需要傳遞以下引數 -

  • 第一個引數key用於識別訊息佇列。key可以是任意值,也可以是來自庫函式ftok()的值。
  • 第二個引數shmflg指定所需的訊息佇列標誌,例如IPC_CREAT(如果不存在則建立訊息佇列)或IPC_EXCL(與IPC_CREAT一起用於建立訊息佇列,如果訊息佇列已經存在,則呼叫失敗)。 還需要傳遞許可權。

註 - 有關許可權的詳細資訊,請參閱前面幾節。

這個呼叫會在成功時返回一個有效的訊息佇列識別符號(用於進一步呼叫訊息佇列),在失敗的情況下返回-1。 要知道失敗的原因,請檢查errno變數或perror()函式。

關於這個呼叫的各種錯誤是EACCESS(許可權被拒絕),EEXIST(佇列已經存在不能建立),ENOENT(佇列不存在),ENOMEM(沒有足夠的記憶體來建立佇列)等

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgsnd(int msgid, const void *msgp, size_t msgsz, int msgflg)

此系統呼叫將訊息傳送/附加到訊息佇列(System V)中。 需要傳遞以下引數 -

  • 第一個引數msgid識別訊息佇列,即訊息佇列識別符號。 msgget()成功時收到識別符號的值
  • 第二個引數msgp是傳送給呼叫者的訊息的指標,定義在以下形式的結構中 -

    struct msgbuf {
     long mtype;
     char mtext[1];
    };
    

    變數mtype用於與不同的訊息型別進行通訊,在msgrcv()呼叫中詳細解釋。 變數mtext是一個陣列或其他大小由msgsz(正值)指定的結構。 如果沒有提到mtext欄位,則將其視為0大小訊息,這是允許的。

  • 第三個引數msgsz是訊息的大小(訊息應該以空字元結尾)

  • 第四個引數msgflg表示某些標誌,例如IPC_NOWAIT(當在佇列中找不到訊息時立即返回)或MSG_NOERROR(截斷訊息文字,如果超過msgsz位元組)

這個呼叫在成功時將返回0,在失敗的情況下為-1。 要知道失敗的原因,請檢查errno變數或perror()函式。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgrcv(int msgid, const void *msgp, size_t msgsz, long msgtype, int msgflg)

該系統呼叫從訊息佇列(系統V)中檢索訊息。 需要傳遞以下引數 -

  • 第一個引數msgid識別訊息佇列,即訊息佇列識別符號。 msgget()成功時收到識別符號的值
  • 第二個引數msgp是從呼叫者接收的訊息的指標。 它在以下形式的結構中被定義 -
    struct msgbuf {
     long mtype;
     char mtext[1];
    };
    
    變數mtype用於與不同的訊息型別進行通訊。 變數mtext是一個陣列或其他大小由msgsz(正值)指定的結構。 如果沒有提到mtext欄位,則將其視為0大小訊息,這是允許的。
  • 第三個引數msgsz是收到的訊息的大小(訊息應該以空字元結尾)
  • 第四個引數msgtype表示訊息的型別 -
    • 如果msgtype0 - 讀取佇列中的第一個收到的訊息。
    • 如果msgtype+ve - 讀取型別為msgtype的佇列中的第一條訊息(如果msgtype10,則唯讀取型別10的第一條訊息,即使其他型別可能位於佇列中的開頭)
    • 如果msgtype-ve - 讀取小於或等於訊息型別的絕對值的最小型別的第一個訊息(例如,如果msgtype-5,則它讀取型別小於5的第一個訊息,即訊息型別從15)
  • 第五個引數msgflg表示某些標誌,例如IPC_NOWAIT(當佇列中沒有訊息時立即返回,或MSG_NOERROR(如果超過了msgsz位元組則截斷訊息文字)

這個呼叫將返回成功時在mtext陣列中實際接收的位元組數,在失敗的情況下返回-1。 要知道失敗的原因,請檢查errno變數或perror()函式。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgctl(int msgid, int cmd, struct msqid_ds *buf)

這個系統呼叫執行訊息佇列(系統V)的控制操作。 需要傳遞以下引數 -

  • 第一個引數msgid識別訊息佇列,即訊息佇列識別符號。 msgget()成功時收到識別符號的值
  • 第二個引數cmd是對訊息佇列執行所需控制操作的命令。 cmd的有效值是 -
    • IPC_STAT - 將struct msqid_ds的每個成員的當前值的資訊複製到由buf指向的傳遞結構中。 該命令需要訊息佇列的讀取許可權。
    • IPC_SET - 設定結構buf指向的使用者ID,所有者的組ID,許可權等。
    • IPC_RMID - 立即刪除訊息佇列。
    • IPC_INFO - 返回有關buf指向的結構中的訊息佇列限制和引數的資訊,該結構的型別為struct msginfo
    • MSG_INFO - 返回一個msginfo結構,其中包含有關訊息佇列消耗的系統資源的資訊。
  • 第三個引數buf是一個指向名為struct msqid_ds的訊息佇列結構的指標。 這個結構的值將被用於任一集或者按照cmd得到。

這個呼叫將根據傳遞的命令返回值。 IPC_INFO和MSG_INFO或MSG_STAT的成功返回訊息佇列的索引或識別符號,其他操作返回0,失敗時返回-1。 要知道失敗的原因,請檢查errno變數或perror()函式。

上面已經看到有關訊息佇列的基本資訊和系統呼叫,現在是時候來看看程式程式碼了。

讓我們看看這個程式實現的描述 -

第1步 - 建立兩個進程,一個用於傳送到訊息佇列(msgq_send.c),另一個用於從訊息佇列(msgq_recv.c)
第2步 - 使用ftok()函式建立鍵(Key)。 為此,最初建立檔案msgq.txt以獲取唯一的鍵。
第3步 - 傳送過程執行以下操作。

  • 讀取使用者輸入的字串
  • 刪除新行,如果存在
  • 傳送到訊息佇列
  • 重複這個過程直到輸入結束(CTRL + D)
  • 一旦收到輸入結束,傳送訊息「end」來表示進程結束。

第4步 - 在接收過程中,執行以下操作。

  • 從佇列中讀取訊息
  • 顯示輸出
  • 如果收到的訊息是「end」,則結束該過程並退出。

為了簡化,我們沒有使用這個範例的訊息型別。 另外,一個進程正在寫入佇列,另一個進程正在從佇列中讀取。 這可以根據需要進行擴充套件,即理想情況下一個進程將寫入佇列中,多個進程從佇列中讀取。

現在,讓我們看看一下進程(訊息傳送到佇列) - 檔案:msgq_send.c

/* Filename: msgq_send.c */
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define PERMS 0644
struct my_msgbuf {
   long mtype;
   char mtext[200];
};

int main(void) {
   struct my_msgbuf buf;
   int msqid;
   int len;
   key_t key;
   system("touch msgq.txt");

   if ((key = ftok("msgq.txt", 'B')) == -1) {
      perror("ftok");
      exit(1);
   }

   if ((msqid = msgget(key, PERMS | IPC_CREAT)) == -1) {
      perror("msgget");
      exit(1);
   }
   printf("message queue: ready to send messages.\n");
   printf("Enter lines of text, ^D to quit:\n");
   buf.mtype = 1; /* we don't really care in this case */

   while(fgets(buf.mtext, sizeof buf.mtext, stdin) != NULL) {
      len = strlen(buf.mtext);
      /* remove newline at end, if it exists */
      if (buf.mtext[len-1] == '\n') buf.mtext[len-1] = '\0';
      if (msgsnd(msqid, &buf, len+1, 0) == -1) /* +1 for '\0' */
      perror("msgsnd");
   }
   strcpy(buf.mtext, "end");
   len = strlen(buf.mtext);
   if (msgsnd(msqid, &buf, len+1, 0) == -1) /* +1 for '\0' */
   perror("msgsnd");

   if (msgctl(msqid, IPC_RMID, NULL) == -1) {
      perror("msgctl");
      exit(1);
   }
   printf("message queue: done sending messages.\n");
   return 0;
}

執行上面範例程式碼,得到以下輸出結果 -

message queue: ready to send messages.
Enter lines of text, ^D to quit:
this is line 1
this is line 2
message queue: done sending messages.

以下是來自訊息接收過程的程式碼(從佇列中檢索訊息) - 檔案:msgq_recv.c -

/* Filename: msgq_recv.c */
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define PERMS 0644
struct my_msgbuf {
   long mtype;
   char mtext[200];
};

int main(void) {
   struct my_msgbuf buf;
   int msqid;
   int toend;
   key_t key;

   if ((key = ftok("msgq.txt", 'B')) == -1) {
      perror("ftok");
      exit(1);
   }

   if ((msqid = msgget(key, PERMS)) == -1) { /* connect to the queue */
      perror("msgget");
      exit(1);
   }
   printf("message queue: ready to receive messages.\n");

   for(;;) { /* normally receiving never ends but just to make conclusion 
             /* this program ends wuth string of end */
      if (msgrcv(msqid, &buf, sizeof(buf.mtext), 0, 0) == -1) {
         perror("msgrcv");
         exit(1);
      }
      printf("recvd: \"%s\"\n", buf.mtext);
      toend = strcmp(buf.mtext,"end");
      if (toend == 0)
      break;
   }
   printf("message queue: done receiving messages.\n");
   system("rm msgq.txt");
   return 0;
}

執行上面範例程式碼,得到以下輸出結果 -

message queue: ready to receive messages.
recvd: "this is line 1"
recvd: "this is line 2"
recvd: "end"
message queue: done receiving messages.