一文聊聊Node中的程序間通訊

2022-09-05 22:00:36
程序間怎麼進行通訊?下面本篇文章給大家介紹一下程序間通訊的原理,希望對大家有所幫助!

前端(vue)入門到精通課程:進入學習

前置知識

檔案描述符

在 Linux 系統中,一切都看成檔案,當程序開啟現有檔案時,會返回一個檔案描述符。 檔案描述符是作業系統為了管理已經被程序開啟的檔案所建立的索引,用來指向被開啟的檔案。 當我們的程序啟動之後,作業系統會給每一個程序分配一個 PCB 控制塊,PCB 中會有一個檔案描述符表,存放當前程序所有的檔案描述符,即當前程序開啟的所有檔案。

? 程序中的檔案描述符是如何和系統檔案對應起來的? 在核心中,系統會維護另外兩種表

  • 開啟檔案表(Open file table)
  • i-node 表(i-node table)

檔案描述符就是陣列的下標,從0開始往上遞增,0/1/2 預設是我們的輸入/輸出/錯誤流的檔案描述符 在 PCB 中維護的檔案描述表中,可以根據檔案描述符找到對應了檔案指標,找到對應的開啟檔案表 開啟檔案表中維護了:檔案偏移量(讀寫檔案的時候會更新);對於檔案的狀態標識;指向 i-node 表的指標 想要真正的操作檔案,還得靠 i-node 表,能夠獲取到真實檔案的相關資訊

他們之間的關係

1.png

圖解

  • 在程序 A 中,檔案描述符1/20均指向了同一開啟檔案表項23,這可能是對同一檔案多次呼叫了 open 函數形成的
  • 程序 A/B 的檔案描述符2都指向同一檔案,這可能是呼叫了 fork 建立子程序,A/B 是父子關係程序
  • 程序 A 的檔案描述符0和程序 B 的檔案描述符指向了不同的開啟檔案表項,但這些表項指向了同一個檔案,這可能是 A/B 程序分別對同一檔案發起了 open 呼叫

總結

  • 同一程序的不同檔案描述符可以指向同一個檔案
  • 不同程序可以擁有相同的檔案描述符
  • 不同程序的同一檔案描述符可以指向不同的檔案
  • 不同程序的不同檔案描述符可以指向同一個檔案

檔案描述符的重定向

每次讀寫程序的時候,都是從檔案描述符下手,找到對應的開啟檔案表項,再找到對應的 i-node 表

?如何實現檔案描述符重定向? 因為在檔案描述符表中,能夠找到對應的檔案指標,如果我們改變了檔案指標,是不是後續的兩個表內容就發生了改變 例如:檔案描述符1指向的顯示器,那麼將檔案描述符1指向 log.txt 檔案,那麼檔案描述符 1 也就和 log.txt 對應起來了

shell 對檔案描述符的重定向

> 是輸出重定向符號,< 是輸入重定向符號,它們是檔案描述符操作符 > 和 < 通過修改檔案描述符改變了檔案指標的指向,來能夠實現重定向的功能

我們使用cat hello.txt時,預設會將結果輸出到顯示器上,使用 > 來重定向。cat hello.txt 1 > log.txt以輸出的方式開啟檔案 log.txt,並繫結到檔案描述符1上

2.png

c函數對檔案描述符的重定向

dup

dup 函數是用來開啟一個新的檔案描述符,指向和 oldfd 同一個檔案,共用檔案偏移量和檔案狀態

int main(int argc, char const *argv[])
{
    int fd = open("log.txt");
    int copyFd = dup(fd);
    //將fd閱讀檔案置於檔案末尾,計算偏移量。
    cout << "fd = " << fd << " 偏移量: " << lseek(fd, 0, SEEK_END) << endl;
    //現在我們計算copyFd的偏移量
    cout << "copyFd = " << copyFd << "偏移量:" << lseek(copyFd, 0, SEEK_CUR) << endl;
    return 0;
}

3.png

呼叫 dup(3) 的時候,會開啟新的最小描述符,也就是4,這個4指向了3所指向的檔案,操作任意一個 fd 都是修改的一個檔案

dup2

dup2 函數,把指定的 newfd 也指向 oldfd 指向的檔案。執行完dup2之後,newfd 和 oldfd 同時指向同一個檔案,共用檔案偏移量和檔案狀態

int main(int argc, char const *argv[])
{
    int fd = open("log.txt");
    int copyFd = dup(fd);
    //將fd閱讀檔案置於檔案末尾,計算偏移量。
    cout << "fd = " << fd << " 偏移量: " << lseek(fd, 0, SEEK_END) << endl;
    //現在我們計算copyFd的偏移量
    cout << "copyFd = " << copyFd << "偏移量:" << lseek(copyFd, 0, SEEK_CUR) << endl;
    return 0;
}

4.png

Node中通訊原理

Node 中的 IPC 通道具體實現是由 libuv 提供的。根據系統的不同實現方式不同,window 下采用命名管道實現,*nix 下采用 Domain Socket 實現。在應用層只體現為 message 事件和 send 方法。【相關教學推薦:】

5.png

父程序在實際建立子程序之前,會建立 IPC 通道並監聽它,等到建立出真實的子程序後,通過環境變數(NODE_CHANNEL_FD)告訴子程序該 IPC 通道的檔案描述符。

子程序在啟動的過程中,會根據該檔案描述符去連線 IPC 通道,從而完成父子程序的連線。

建立連線之後可以自由的通訊了,IPC 通道是使用命名管道或者 Domain Socket 建立的,屬於雙向通訊。並且它是在系統核心中完成的程序通訊

6.png

⚠️ 只有在啟動的子程序是 Node 程序時,子程序才會根據環境變數去連線對應的 IPC 通道,對於其他型別的子程序則無法實現程序間通訊,除非其他程序也按著該約定去連線這個 IPC 通道。

unix domain socket

是什麼

我們知道經典的通訊方式是有 Socket,我們平時熟知的 Socket 是基於網路協定的,用於兩個不同主機上的兩個程序通訊,通訊需要指定 IP/Host 等。 但如果我們同一臺主機上的兩個程序想要通訊,如果使用 Socket 需要指定 IP/Host,經過網路協定等,會顯得過於繁瑣。所以 Unix Domain Socket 誕生了。

UDS 的優勢:

  • 繫結 socket 檔案而不是繫結 IP/Host;不需要經過網路協定,而是資料的拷貝
  • 也支援 SOCK_STREAM(流通訊端)和 SOCK_DGRAM(封包通訊端),但由於是在本機通過核心通訊,不會丟包也不會出現傳送包的次序和接收包的次序不一致的問題

如何實現

流程圖

7.png

Server 端
int main(int argc, char *argv[])
{
    int server_fd ,ret, client_fd;
    struct sockaddr_un serv, client;
    socklen_t len = sizeof(client);
    char buf[1024] = {0};
    int recvlen;

    // 建立 socket
    server_fd = socket(AF_LOCAL, SOCK_STREAM, 0);

    // 初始化 server 資訊
    serv.sun_family = AF_LOCAL;
    strcpy(serv.sun_path, "server.sock");

    // 繫結
    ret = bind(server_fd, (struct sockaddr *)&serv, sizeof(serv));

    //設定監聽,設定能夠同時和伺服器端連線的使用者端數量
    ret = listen(server_fd, 36);

    //等待使用者端連線
    client_fd = accept(server_fd, (struct sockaddr *)&client, &len);
    printf("=====client bind file:%s\n", client.sun_path);

    while (1) {
        recvlen = recv(client_fd, buf, sizeof(buf), 0);
        if (recvlen == -1) {
            perror("recv error");
            return -1;
        } else if (recvlen == 0) {
            printf("client disconnet...\n");
            close(client_fd);
            break;
        } else {
            printf("recv buf %s\n", buf);
            send(client_fd, buf, recvlen, 0);
        }
    }

    close(client_fd);
    close(server_fd);
    return 0;
}
Client 端
int main(int argc, char *argv[])
{
    int client_fd ,ret;
    struct sockaddr_un serv, client;
    socklen_t len = sizeof(client);
    char buf[1024] = {0};
    int recvlen;

    //建立socket
    client_fd = socket(AF_LOCAL, SOCK_STREAM, 0);

    //給使用者端繫結一個通訊端檔案
    client.sun_family = AF_LOCAL;
    strcpy(client.sun_path, "client.sock");
    ret = bind(client_fd, (struct sockaddr *)&client, sizeof(client));

    //初始化server資訊
    serv.sun_family = AF_LOCAL;
    strcpy(serv.sun_path, "server.sock");
    //連線
    connect(client_fd, (struct sockaddr *)&serv, sizeof(serv));

    while (1) {
        fgets(buf, sizeof(buf), stdin);
        send(client_fd, buf, strlen(buf)+1, 0);

        recv(client_fd, buf, sizeof(buf), 0);
        printf("recv buf %s\n", buf);
    }

    close(client_fd);
    return 0;
}

命名管道(Named Pipe)

是什麼

命名管道是可以在同一臺計算機的不同程序之間,或者跨越一個網路的不同計算機的不同程序之間的可靠的單向或者雙向的資料通訊。 建立命名管道的程序被稱為管道伺服器端(Pipe Server),連線到這個管道的程序稱為管道使用者端(Pipe Client)。

命名管道的命名規範:\server\pipe[\path]\name

  • 其中 server 指定一個伺服器的名字,本機適用 \. 表示,\192.10.10.1 表示網路上的伺服器
  • \pipe 是一個不可變化的字串,用於指定該檔案屬於 NPFS(Named Pipe File System)
  • [\path]\name 是唯一命名管道名稱的標識

怎麼實現

流程圖

8.png

Pipe Server
void ServerTest()
{
    HANDLE  serverNamePipe;
    char    pipeName[MAX_PATH] = {0};
    char    szReadBuf[MAX_BUFFER] = {0};
    char    szWriteBuf[MAX_BUFFER] = {0};
    DWORD   dwNumRead = 0;
    DWORD   dwNumWrite = 0;

    strcpy(pipeName, "\\\\.\\pipe\\shuangxuPipeTest");
    // 建立管道範例
    serverNamePipe = CreateNamedPipeA(pipeName,
        PIPE_ACCESS_DUPLEX|FILE_FLAG_WRITE_THROUGH,
        PIPE_TYPE_BYTE|PIPE_READMODE_BYTE|PIPE_WAIT,
        PIPE_UNLIMITED_INSTANCES, 0, 0, 0, NULL);
    WriteLog("建立管道成功...");
    // 等待使用者端連線
    BOOL bRt= ConnectNamedPipe(serverNamePipe, NULL );
    WriteLog( "收到使用者端的連線成功...");
    // 接收資料
    memset( szReadBuf, 0, MAX_BUFFER );
    bRt = ReadFile(serverNamePipe, szReadBuf, MAX_BUFFER-1, &dwNumRead, NULL );
    // 業務邏輯處理 (只為測試用返回原來的資料)
    WriteLog( "收到客戶資料:[%s]", szReadBuf);
    // 傳送資料
    if( !WriteFile(serverNamePipe, szWriteBuf, dwNumRead, &dwNumWrite, NULL ) )
    {
        WriteLog("向客戶寫入資料失敗:[%#x]", GetLastError());
        return ;
    }
    WriteLog("寫入資料成功...");
}
Pipe Client
void ClientTest()
{
    char    pipeName[MAX_PATH] = {0};
    HANDLE  clientNamePipe;
    DWORD   dwRet;
    char    szReadBuf[MAX_BUFFER] = {0};
    char    szWriteBuf[MAX_BUFFER] = {0};
    DWORD   dwNumRead = 0;
    DWORD   dwNumWrite = 0;

    strcpy(pipeName, "\\\\.\\pipe\\shuangxuPipeTest");
    // 檢測管道是否可用
    if(!WaitNamedPipeA(pipeName, 10000)){
        WriteLog("管道[%s]無法開啟", pipeName);
        return ;
    }
    // 連線管道
    clientNamePipe = CreateFileA(pipeName,
        GENERIC_READ|GENERIC_WRITE,
        0,
        NULL,
        OPEN_EXISTING,
        FILE_ATTRIBUTE_NORMAL,
        NULL);
    WriteLog("管道連線成功...");
    scanf( "%s", szWritebuf );
    // 傳送資料
    if( !WriteFile(clientNamePipe, szWriteBuf, strlen(szWriteBuf), &dwNumWrite, NULL)){
        WriteLog("傳送資料失敗,GetLastError=[%#x]", GetLastError());
        return ;
    }
    printf("傳送資料成功:%s\n", szWritebuf );
    // 接收資料
    if( !ReadFile(clientNamePipe, szReadBuf, MAX_BUFFER-1, &dwNumRead, NULL)){
        WriteLog("接收資料失敗,GetLastError=[%#x]", GetLastError() );
        return ;
    }
    WriteLog( "接收到伺服器返回:%s", szReadBuf );
    // 關閉管道
    CloseHandle(clientNamePipe);
}

Node 建立子程序的流程

Unix

9.png

對於建立子程序、建立管道、重定向管道均是在 c++ 層實現的

建立子程序

int main(int argc,char *argv[]){
    pid_t pid = fork();
    if (pid < 0) {
        // 錯誤
    } else if(pid == 0) {
        // 子程序
    } else {
        // 父程序
    }
}

建立管道

使用 socketpair 建立管道,其建立出來的管道是全雙工的,返回的檔案描述符中的任何一個都可讀和可寫

int main ()
{
    int fd[2];
    int r = socketpair(AF_UNIX, SOCK_STREAM, 0, fd);

    if (fork()){ /* 父程序 */
        int val = 0;
        close(fd[1]);
        while (1){
            sleep(1);
            ++val;
            printf("傳送資料: %d\n", val);
            write(fd[0], &val, sizeof(val));
            read(fd[0], &val, sizeof(val));
            printf("接收資料: %d\n", val);
        }
    } else {  /*子程序*/
        int val;
        close(fd[0]);
        while(1){
            read(fd[1], &val, sizeof(val));
            ++val;
            write(fd[1], &val, sizeof(val));
        }
    }
}

當我們使用 socketpair 建立了管道之後,父程序關閉了 fd[1],子程序關閉了 fd[0]。子程序可以通過 fd[1] 讀寫資料;同理主程序通過 fd[0]讀寫資料完成通訊。

對應程式碼:https://github.com/nodejs/node/blob/main/deps/uv/src/unix/process.c#L344

child_process.fork 的詳細呼叫

fork 函數開啟一個子程序的流程

10.png

  • 初始化引數中的 options.stdio,並且呼叫 spawn 函數

    function spawn(file, args, options) {
      const child = new ChildProcess();
    
      child.spawn(options);
    }
  • 建立 ChildProcess 範例,建立子程序也是呼叫 C++ 層 this._handle.spawn 方法

    function ChildProcess() {
    	// C++層定義
    	this._handle = new Process();
    }
  • 通過 child.spawn 呼叫到 ChildProcess.prototype.spawn 方法中。其中 getValidStdio 方法會根據 options.stdio 建立和 C++ 互動的 Pipe 物件,並獲得對應的檔案描述符,將檔案描述符寫入到環境變數 NODE_CHANNEL_FD 中,呼叫 C++ 層建立子程序,在呼叫 setupChannel 方法

    ChildProcess.prototype.spawn = function(options) {
      // 預處理程序間通訊的資料結構
    	stdio = getValidStdio(stdio, false);
    	const ipc = stdio.ipc;
        const ipcFd = stdio.ipcFd;
    	//將檔案描述符寫入環境變數中
    	if (ipc !== undefined) {
        ArrayPrototypePush(options.envPairs, `NODE_CHANNEL_FD=${ipcFd}`);
      }
    	// 建立程序
    	const err = this._handle.spawn(options);
        // 新增send方法和監聽IPC中資料
    	if (ipc !== undefined) setupChannel(this, ipc, serialization);
    }
  • 子程序啟動時,會根據環境變數中是否存在 NODE_CHANNEL_FD 判斷是否呼叫 _forkChild 方法,建立一個 Pipe 物件, 同時呼叫 open 方法開啟對應的檔案描述符,在呼叫setupChannel

    function _forkChild(fd, serializationMode) {
      const p = new Pipe(PipeConstants.IPC);
      p.open(fd);
      p.unref();
      const control = setupChannel(process, p, serializationMode);
    }

控制程式碼傳遞

setupChannel 主要是完成了處理接收的訊息、傳送訊息、處理檔案描述符傳遞等

function setipChannel(){
	channel.onread = function(arrayBuffer){
		//...
	}
	target.on('internalMessage', function(message, handle){
		//...
	})
	target.send = function(message, handle, options, callback){
		//...
	}
	target._send = function(message, handle, options, callback){
		//...
	}
	function handleMessage(message, handle, internal){
		//...
	}
}
  • target.send: process.send 方法,這裡 target 就是程序物件本身.
  • target._send: 執行具體 send 邏輯的函數, 當引數 handle 不存在時, 表示普通的訊息傳遞;若存在,包裝為內部物件,表明是一個 internalMessage 事件觸發。呼叫使用JSON.stringify 序列化物件, 使用channel.writeUtf8String 寫入檔案描述符中
  • channel.onread: 獲取到資料時觸發, 跟 channel.writeUtf8String 相對應。通過 JSON.parse 反序列化 message 之後, 呼叫 handleMessage 進而觸發對應事件
  • handleMessage: 用來判斷是觸發 message 事件還是 internalMessage 事件
  • target.on('internalMessage'): 針對內部物件做特殊處理,在呼叫 message 事件

11.png

程序間訊息傳遞

  • 父程序通過 child.send 傳送訊息 和 server/socket 控制程式碼物件

  • 普通訊息直接 JSON.stringify 序列化;對於控制程式碼物件來說,需要先包裝成為內部物件

    message = {
    	cmd: 'NODE_HANDLE',
    	type: null,
    	msg: message
    };

    通過 handleConversion.[message.type].send 的方法取出控制程式碼物件對應的 C++ 層面的 TCP 物件,在採用JSON.stringify 序列化

    const handleConversion = {
    	'net.Server': {
        simultaneousAccepts: true,
    
        send(message, server, options) {
          return server._handle;
        },
    
        got(message, handle, emit) {
          const server = new net.Server();
          server.listen(handle, () => {
            emit(server);
          });
        }
      }
    //....
    }
  • 最後將序列化後的內部物件和 TCP 物件寫入到 IPC 通道中

  • 子程序在接收到訊息之後,使用 JSON.parse 反序列化訊息,如果為內部物件觸發 internalMessage 事件

  • 檢查是否帶有 TCP 物件,通過 handleConversion.[message.type].got 得到和父程序一樣的控制程式碼物件

  • 最後發觸發 message 事件傳遞處理好的訊息和控制程式碼物件,子程序通過 process.on 接收

更多node相關知識,請存取:!

以上就是一文聊聊Node中的程序間通訊的詳細內容,更多請關注TW511.COM其它相關文章!