前端(vue)入門到精通課程:進入學習
在 Linux 系統中,一切都看成檔案,當程序開啟現有檔案時,會返回一個檔案描述符。 檔案描述符是作業系統為了管理已經被程序開啟的檔案所建立的索引,用來指向被開啟的檔案。 當我們的程序啟動之後,作業系統會給每一個程序分配一個 PCB 控制塊,PCB 中會有一個檔案描述符表,存放當前程序所有的檔案描述符,即當前程序開啟的所有檔案。
? 程序中的檔案描述符是如何和系統檔案對應起來的? 在核心中,系統會維護另外兩種表
檔案描述符就是陣列的下標,從0開始往上遞增,0/1/2 預設是我們的輸入/輸出/錯誤流的檔案描述符 在 PCB 中維護的檔案描述表中,可以根據檔案描述符找到對應了檔案指標,找到對應的開啟檔案表 開啟檔案表中維護了:檔案偏移量(讀寫檔案的時候會更新);對於檔案的狀態標識;指向 i-node 表的指標 想要真正的操作檔案,還得靠 i-node 表,能夠獲取到真實檔案的相關資訊
他們之間的關係
圖解
總結
每次讀寫程序的時候,都是從檔案描述符下手,找到對應的開啟檔案表項,再找到對應的 i-node 表
?如何實現檔案描述符重定向? 因為在檔案描述符表中,能夠找到對應的檔案指標,如果我們改變了檔案指標,是不是後續的兩個表內容就發生了改變 例如:檔案描述符1指向的顯示器,那麼將檔案描述符1指向 log.txt 檔案,那麼檔案描述符 1 也就和 log.txt 對應起來了
> 是輸出重定向符號,< 是輸入重定向符號,它們是檔案描述符操作符 > 和 < 通過修改檔案描述符改變了檔案指標的指向,來能夠實現重定向的功能
我們使用cat hello.txt
時,預設會將結果輸出到顯示器上,使用 > 來重定向。cat hello.txt 1 > log.txt
以輸出的方式開啟檔案 log.txt,並繫結到檔案描述符1上
dup 函數是用來開啟一個新的檔案描述符,指向和 oldfd 同一個檔案,共用檔案偏移量和檔案狀態
int main(int argc, char const *argv[]) { int fd = open("log.txt"); int copyFd = dup(fd); //將fd閱讀檔案置於檔案末尾,計算偏移量。 cout << "fd = " << fd << " 偏移量: " << lseek(fd, 0, SEEK_END) << endl; //現在我們計算copyFd的偏移量 cout << "copyFd = " << copyFd << "偏移量:" << lseek(copyFd, 0, SEEK_CUR) << endl; return 0; }
呼叫 dup(3) 的時候,會開啟新的最小描述符,也就是4,這個4指向了3所指向的檔案,操作任意一個 fd 都是修改的一個檔案
dup2 函數,把指定的 newfd 也指向 oldfd 指向的檔案。執行完dup2之後,newfd 和 oldfd 同時指向同一個檔案,共用檔案偏移量和檔案狀態
int main(int argc, char const *argv[]) { int fd = open("log.txt"); int copyFd = dup(fd); //將fd閱讀檔案置於檔案末尾,計算偏移量。 cout << "fd = " << fd << " 偏移量: " << lseek(fd, 0, SEEK_END) << endl; //現在我們計算copyFd的偏移量 cout << "copyFd = " << copyFd << "偏移量:" << lseek(copyFd, 0, SEEK_CUR) << endl; return 0; }
Node 中的 IPC 通道具體實現是由 libuv 提供的。根據系統的不同實現方式不同,window 下采用命名管道實現,*nix 下采用 Domain Socket 實現。在應用層只體現為 message 事件和 send 方法。【相關教學推薦:】
父程序在實際建立子程序之前,會建立 IPC 通道並監聽它,等到建立出真實的子程序後,通過環境變數(NODE_CHANNEL_FD)告訴子程序該 IPC 通道的檔案描述符。
子程序在啟動的過程中,會根據該檔案描述符去連線 IPC 通道,從而完成父子程序的連線。
建立連線之後可以自由的通訊了,IPC 通道是使用命名管道或者 Domain Socket 建立的,屬於雙向通訊。並且它是在系統核心中完成的程序通訊
⚠️ 只有在啟動的子程序是 Node 程序時,子程序才會根據環境變數去連線對應的 IPC 通道,對於其他型別的子程序則無法實現程序間通訊,除非其他程序也按著該約定去連線這個 IPC 通道。
我們知道經典的通訊方式是有 Socket,我們平時熟知的 Socket 是基於網路協定的,用於兩個不同主機上的兩個程序通訊,通訊需要指定 IP/Host 等。 但如果我們同一臺主機上的兩個程序想要通訊,如果使用 Socket 需要指定 IP/Host,經過網路協定等,會顯得過於繁瑣。所以 Unix Domain Socket 誕生了。
UDS 的優勢:
int main(int argc, char *argv[]) { int server_fd ,ret, client_fd; struct sockaddr_un serv, client; socklen_t len = sizeof(client); char buf[1024] = {0}; int recvlen; // 建立 socket server_fd = socket(AF_LOCAL, SOCK_STREAM, 0); // 初始化 server 資訊 serv.sun_family = AF_LOCAL; strcpy(serv.sun_path, "server.sock"); // 繫結 ret = bind(server_fd, (struct sockaddr *)&serv, sizeof(serv)); //設定監聽,設定能夠同時和伺服器端連線的使用者端數量 ret = listen(server_fd, 36); //等待使用者端連線 client_fd = accept(server_fd, (struct sockaddr *)&client, &len); printf("=====client bind file:%s\n", client.sun_path); while (1) { recvlen = recv(client_fd, buf, sizeof(buf), 0); if (recvlen == -1) { perror("recv error"); return -1; } else if (recvlen == 0) { printf("client disconnet...\n"); close(client_fd); break; } else { printf("recv buf %s\n", buf); send(client_fd, buf, recvlen, 0); } } close(client_fd); close(server_fd); return 0; }
int main(int argc, char *argv[]) { int client_fd ,ret; struct sockaddr_un serv, client; socklen_t len = sizeof(client); char buf[1024] = {0}; int recvlen; //建立socket client_fd = socket(AF_LOCAL, SOCK_STREAM, 0); //給使用者端繫結一個通訊端檔案 client.sun_family = AF_LOCAL; strcpy(client.sun_path, "client.sock"); ret = bind(client_fd, (struct sockaddr *)&client, sizeof(client)); //初始化server資訊 serv.sun_family = AF_LOCAL; strcpy(serv.sun_path, "server.sock"); //連線 connect(client_fd, (struct sockaddr *)&serv, sizeof(serv)); while (1) { fgets(buf, sizeof(buf), stdin); send(client_fd, buf, strlen(buf)+1, 0); recv(client_fd, buf, sizeof(buf), 0); printf("recv buf %s\n", buf); } close(client_fd); return 0; }
命名管道是可以在同一臺計算機的不同程序之間,或者跨越一個網路的不同計算機的不同程序之間的可靠的單向或者雙向的資料通訊。 建立命名管道的程序被稱為管道伺服器端(Pipe Server),連線到這個管道的程序稱為管道使用者端(Pipe Client)。
命名管道的命名規範:\server\pipe[\path]\name
void ServerTest() { HANDLE serverNamePipe; char pipeName[MAX_PATH] = {0}; char szReadBuf[MAX_BUFFER] = {0}; char szWriteBuf[MAX_BUFFER] = {0}; DWORD dwNumRead = 0; DWORD dwNumWrite = 0; strcpy(pipeName, "\\\\.\\pipe\\shuangxuPipeTest"); // 建立管道範例 serverNamePipe = CreateNamedPipeA(pipeName, PIPE_ACCESS_DUPLEX|FILE_FLAG_WRITE_THROUGH, PIPE_TYPE_BYTE|PIPE_READMODE_BYTE|PIPE_WAIT, PIPE_UNLIMITED_INSTANCES, 0, 0, 0, NULL); WriteLog("建立管道成功..."); // 等待使用者端連線 BOOL bRt= ConnectNamedPipe(serverNamePipe, NULL ); WriteLog( "收到使用者端的連線成功..."); // 接收資料 memset( szReadBuf, 0, MAX_BUFFER ); bRt = ReadFile(serverNamePipe, szReadBuf, MAX_BUFFER-1, &dwNumRead, NULL ); // 業務邏輯處理 (只為測試用返回原來的資料) WriteLog( "收到客戶資料:[%s]", szReadBuf); // 傳送資料 if( !WriteFile(serverNamePipe, szWriteBuf, dwNumRead, &dwNumWrite, NULL ) ) { WriteLog("向客戶寫入資料失敗:[%#x]", GetLastError()); return ; } WriteLog("寫入資料成功..."); }
void ClientTest() { char pipeName[MAX_PATH] = {0}; HANDLE clientNamePipe; DWORD dwRet; char szReadBuf[MAX_BUFFER] = {0}; char szWriteBuf[MAX_BUFFER] = {0}; DWORD dwNumRead = 0; DWORD dwNumWrite = 0; strcpy(pipeName, "\\\\.\\pipe\\shuangxuPipeTest"); // 檢測管道是否可用 if(!WaitNamedPipeA(pipeName, 10000)){ WriteLog("管道[%s]無法開啟", pipeName); return ; } // 連線管道 clientNamePipe = CreateFileA(pipeName, GENERIC_READ|GENERIC_WRITE, 0, NULL, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL); WriteLog("管道連線成功..."); scanf( "%s", szWritebuf ); // 傳送資料 if( !WriteFile(clientNamePipe, szWriteBuf, strlen(szWriteBuf), &dwNumWrite, NULL)){ WriteLog("傳送資料失敗,GetLastError=[%#x]", GetLastError()); return ; } printf("傳送資料成功:%s\n", szWritebuf ); // 接收資料 if( !ReadFile(clientNamePipe, szReadBuf, MAX_BUFFER-1, &dwNumRead, NULL)){ WriteLog("接收資料失敗,GetLastError=[%#x]", GetLastError() ); return ; } WriteLog( "接收到伺服器返回:%s", szReadBuf ); // 關閉管道 CloseHandle(clientNamePipe); }
對於建立子程序、建立管道、重定向管道均是在 c++ 層實現的
int main(int argc,char *argv[]){ pid_t pid = fork(); if (pid < 0) { // 錯誤 } else if(pid == 0) { // 子程序 } else { // 父程序 } }
使用 socketpair 建立管道,其建立出來的管道是全雙工的,返回的檔案描述符中的任何一個都可讀和可寫
int main () { int fd[2]; int r = socketpair(AF_UNIX, SOCK_STREAM, 0, fd); if (fork()){ /* 父程序 */ int val = 0; close(fd[1]); while (1){ sleep(1); ++val; printf("傳送資料: %d\n", val); write(fd[0], &val, sizeof(val)); read(fd[0], &val, sizeof(val)); printf("接收資料: %d\n", val); } } else { /*子程序*/ int val; close(fd[0]); while(1){ read(fd[1], &val, sizeof(val)); ++val; write(fd[1], &val, sizeof(val)); } } }
當我們使用 socketpair 建立了管道之後,父程序關閉了 fd[1],子程序關閉了 fd[0]。子程序可以通過 fd[1] 讀寫資料;同理主程序通過 fd[0]讀寫資料完成通訊。
對應程式碼:https://github.com/nodejs/node/blob/main/deps/uv/src/unix/process.c#L344
fork 函數開啟一個子程序的流程
初始化引數中的 options.stdio,並且呼叫 spawn 函數
function spawn(file, args, options) { const child = new ChildProcess(); child.spawn(options); }
建立 ChildProcess 範例,建立子程序也是呼叫 C++ 層 this._handle.spawn 方法
function ChildProcess() { // C++層定義 this._handle = new Process(); }
通過 child.spawn 呼叫到 ChildProcess.prototype.spawn 方法中。其中 getValidStdio 方法會根據 options.stdio 建立和 C++ 互動的 Pipe 物件,並獲得對應的檔案描述符,將檔案描述符寫入到環境變數 NODE_CHANNEL_FD 中,呼叫 C++ 層建立子程序,在呼叫 setupChannel 方法
ChildProcess.prototype.spawn = function(options) { // 預處理程序間通訊的資料結構 stdio = getValidStdio(stdio, false); const ipc = stdio.ipc; const ipcFd = stdio.ipcFd; //將檔案描述符寫入環境變數中 if (ipc !== undefined) { ArrayPrototypePush(options.envPairs, `NODE_CHANNEL_FD=${ipcFd}`); } // 建立程序 const err = this._handle.spawn(options); // 新增send方法和監聽IPC中資料 if (ipc !== undefined) setupChannel(this, ipc, serialization); }
子程序啟動時,會根據環境變數中是否存在 NODE_CHANNEL_FD 判斷是否呼叫 _forkChild 方法,建立一個 Pipe 物件, 同時呼叫 open 方法開啟對應的檔案描述符,在呼叫setupChannel
function _forkChild(fd, serializationMode) { const p = new Pipe(PipeConstants.IPC); p.open(fd); p.unref(); const control = setupChannel(process, p, serializationMode); }
setupChannel 主要是完成了處理接收的訊息、傳送訊息、處理檔案描述符傳遞等
function setipChannel(){ channel.onread = function(arrayBuffer){ //... } target.on('internalMessage', function(message, handle){ //... }) target.send = function(message, handle, options, callback){ //... } target._send = function(message, handle, options, callback){ //... } function handleMessage(message, handle, internal){ //... } }
程序間訊息傳遞
父程序通過 child.send 傳送訊息 和 server/socket 控制程式碼物件
普通訊息直接 JSON.stringify 序列化;對於控制程式碼物件來說,需要先包裝成為內部物件
message = { cmd: 'NODE_HANDLE', type: null, msg: message };
通過 handleConversion.[message.type].send 的方法取出控制程式碼物件對應的 C++ 層面的 TCP 物件,在採用JSON.stringify 序列化
const handleConversion = { 'net.Server': { simultaneousAccepts: true, send(message, server, options) { return server._handle; }, got(message, handle, emit) { const server = new net.Server(); server.listen(handle, () => { emit(server); }); } } //.... }
最後將序列化後的內部物件和 TCP 物件寫入到 IPC 通道中
子程序在接收到訊息之後,使用 JSON.parse 反序列化訊息,如果為內部物件觸發 internalMessage 事件
檢查是否帶有 TCP 物件,通過 handleConversion.[message.type].got 得到和父程序一樣的控制程式碼物件
最後發觸發 message 事件傳遞處理好的訊息和控制程式碼物件,子程序通過 process.on 接收
更多node相關知識,請存取:!
以上就是一文聊聊Node中的程序間通訊的詳細內容,更多請關注TW511.COM其它相關文章!