聊聊Node.js中的多程序和多執行緒

2022-07-25 22:00:46
大家都知道 是單執行緒的,卻不知它也提供了多進(線)程模組來加速處理一些特殊任務,本文便帶領大家瞭解下 Node.js 的多進(線)程,希望對大家有所幫助!

我們都知道 採用的是單執行緒、基於事件驅動的非同步 I/O 模型,其特性決定了它無法利用 CPU 多核的優勢,也不善於完成一些非 I/O 型別的操作(比如執行指令碼、AI 計算、影象處理等),為了解決此類問題,Node.js 提供了常規的多進(執行緒)方案(關於程序、執行緒的討論,可參見筆者的另一篇文章 ),本文便為大家介紹 Node.js 的多進(線)程機制。

child_process

我們可使用 child_process 模組建立 Node.js 的子程序,來完成一些特殊的任務(比如執行指令碼),該模組主要提供了 execexecFileforkspwan 等方法,下面我們就簡單介紹下這些方法的使用。

exec

const { exec } = require('child_process');

exec('ls -al', (error, stdout, stderr) => {
  console.log(stdout);
});

該方法根據 options.shell 指定的可執行檔案處理命令字串,在命令的執行過程中快取其輸出,直到命令執行完成後,再將執行結果以回撥函數引數的形式返回。

該方法的引數解釋如下:

  • command:將要執行的命令(比如 ls -al);

  • options:引數設定(可不指定),相關屬性如下:

    • cwd:子程序的當前工作目錄,預設取 process.cwd() 的值;

    • env:環境變數設定(為鍵值對物件),預設取 process.env 的值;

    • encoding:字元編碼,預設值為:utf8

    • shell:處理命令字串的可執行檔案,Unix 上預設值為 /bin/shWindows 上預設值取 process.env.ComSpec 的值(如為空則為 cmd.exe);比如:

      const { exec } = require('child_process');
      
      exec("print('Hello World!')", { shell: 'python' }, (error, stdout, stderr) => {
        console.log(stdout);
      });

      執行上面的例子將輸出 Hello World!,這等同於子程序執行了 python -c "print('Hello World!')" 命令,因此在使用該屬性時需要注意,所指定的可執行檔案必須支援通過 -c 選項來執行相關語句。

      注:碰巧 Node.js 也支援 -c 選項,但它等同於 --check 選項,只用來檢測指定的指令碼是否存在語法錯誤,並不會執行相關指令碼。

    • signal:使用指定的 AbortSignal 終止子程序,該屬性在 v14.17.0 以上可用,比如:

      const { exec } = require('child_process');
      
      const ac = new AbortController();
      exec('ls -al', { signal: ac.signal }, (error, stdout, stderr) => {});

      上例中,我們可通過呼叫 ac.abort() 來提前終止子程序。

    • timeout:子程序的超時時間(如果該屬性的值大於 0,那麼當子程序執行時間超過指定值時,將會給子程序傳送屬性 killSignal 指定的終止訊號),單位毫米,預設值為 0

    • maxBuffer:stdout 或 stderr 所允許的最大快取(二進位制),如果超出,子程序將會被殺死,並且將會截斷任何輸出,預設值為 1024 * 1024

    • killSignal:子程序終止訊號,預設值為 SIGTERM

    • uid:執行子程序的 uid

    • gid:執行子程序的 gid

    • windowsHide:是否隱藏子程序的控制檯視窗,常用於 Windows 系統,預設值為 false

  • callback:回撥函數,包含 errorstdoutstderr 三個引數:

    • error:如果命令列執行成功,值為 null,否則值為 Error 的一個範例,其中 error.code 為子程序的退出的錯誤碼,error.signal 為子程序終止的訊號;
    • stdoutstderr:子程序的 stdoutstderr,按照 encoding 屬性的值進行編碼,如果 encoding 的值為 buffer,或者 stdoutstderr 的值是一個無法識別的字串,將按照 buffer 進行編碼。

execFile

const { execFile } = require('child_process');

execFile('ls', ['-al'], (error, stdout, stderr) => {
  console.log(stdout);
});

該方法的功能類似於 exec,唯一的區別是 execFile 在預設情況下直接用指定的可執行檔案(即引數 file 的值)處理命令,這使得其效率略高於 exec(如果檢視 shell 的處理邏輯,筆者感覺這效率可忽略不計)。

該方法的引數解釋如下:

  • file:可執行檔案的名字或路徑;

  • args:可執行檔案的參數列;

  • options:引數設定(可不指定),相關屬性如下:

    • shell:值為 false 時表示直接用指定的可執行檔案(即引數 file 的值)處理命令,值為 true 或其它字串時,作用等同於 exec 中的 shell,預設值為 false
    • windowsVerbatimArguments:在 Windows 中是否對引數進行引號或跳脫處理,在 Unix 中將忽略該屬性,預設值為 false
    • 屬性 cwdenvencodingtimeoutmaxBufferkillSignaluidgidwindowsHidesignal 在上文中已介紹,此處不再重述。
  • callback:回撥函數,等同於 exec 中的 callback,此處不再闡述。

fork

const { fork } = require('child_process');

const echo = fork('./echo.js', {
  silent: true
});
echo.stdout.on('data', (data) => {
  console.log(`stdout: ${data}`);
});

echo.stderr.on('data', (data) => {
  console.error(`stderr: ${data}`);
});

echo.on('close', (code) => {
  console.log(`child process exited with code ${code}`);
});

該方法用於建立新的 Node.js 範例以執行指定的 Node.js 指令碼,與父程序之間以 IPC 方式進行通訊。

該方法的引數解釋如下:

  • modulePath:要執行的 Node.js 指令碼路徑;

  • args:傳遞給 Node.js 指令碼的參數列;

  • options:引數設定(可不指定),相關屬性如:

    • detached:參見下文對 spwanoptions.detached 的說明;

    • execPath:建立子程序的可執行檔案;

    • execArgv:傳遞給可執行檔案的字串參數列,預設取 process.execArgv 的值;

    • serialization:程序間訊息的序列號型別,可用值為 jsonadvanced,預設值為 json

    • slient: 如果為 true,子程序的 stdinstdoutstderr 將通過管道傳遞給父程序,否則將繼承父程序的 stdinstdoutstderr;預設值為 false

    • stdio:參見下文對 spwanoptions.stdio 的說明。這裡需要注意的是:

      • 如果指定了該屬性,將忽略 slient 的值;
      • 必須包含一個值為 ipc 的選項(比如 [0, 1, 2, 'ipc']),否則將丟擲異常。
    • 屬性 cwdenvuidgidwindowsVerbatimArgumentssignaltimeoutkillSignal 在上文中已介紹,此處不再重述。

spwan

const { spawn } = require('child_process');

const ls = spawn('ls', ['-al']);
ls.stdout.on('data', (data) => {
  console.log(`stdout: ${data}`);
});

ls.stderr.on('data', (data) => {
  console.error(`stderr: ${data}`);
});

ls.on('close', (code) => {
  console.log(`child process exited with code ${code}`);
});

該方法為 child_process 模組的基礎方法,execexecFilefork 最終都會呼叫 spawn 來建立子程序。

該方法的引數解釋如下:

  • command:可執行檔案的名字或路徑;

  • args:傳遞給可執行檔案的參數列;

  • options:引數設定(可不指定),相關屬性如下:

    • argv0:傳送給子程序 argv[0] 的值,預設取引數 command 的值;

    • detached:是否允許子程序可以獨立於父程序執行(即父程序退出後,子程序可以繼續執行),預設值為 false,其值為 true 時,各平臺的效果如下所述:

      • Windows 系統中,父程序退出後,子程序可以繼續執行,並且子程序擁有自己的控制檯視窗(該特性一旦啟動後,在執行過程中將無法更改);
      • 在非 Windows 系統中,子程序將作為新程序對談組的組長,此刻不管子程序是否與父程序分離,子程序都可以在父程序退出後繼續執行。

      需要注意的是,如果子程序需要執行長時間的任務,並且想要父程序提前退出,需要同時滿足以下幾點:

      • 呼叫子程序的 unref 方法從而將子程序從父程序的事件迴圈中剔除;
      • detached 設定為 true
      • stdioignore

      比如下面的例子:

      // hello.js
      const fs = require('fs');
      let index = 0;
      function run() {
        setTimeout(() => {
          fs.writeFileSync('./hello', `index: ${index}`);
          if (index < 10) {
            index += 1;
            run();
          }
        }, 1000);
      }
      run();
      
      // main.js
      const { spawn } = require('child_process');
      const child = spawn('node', ['./hello.js'], {
        detached: true,
        stdio: 'ignore'
      });
      child.unref();
    • stdio:子程序標準輸入輸出設定,預設值為 pipe,值為字串或陣列:

      • 值為字串時,會將其轉換為含有三個項的陣列(比如 pipe 被轉換為 ['pipe', 'pipe', 'pipe']),可用值為 pipeoverlappedignoreinherit
      • 值為陣列時,其中陣列的前三項分別代表對 stdinstdoutstderr 的設定,每一項的可用值為 pipeoverlappedignoreinheritipcStream 物件、正整數(在父程序開啟的檔案描述符)、null(如位於陣列的前三項,等同於 pipe,否則等同於 ignore)、undefined(如位於陣列的前三項,等同於 pipe,否則等同於 ignore)。
    • 屬性 cwdenvuidgidserializationshell(值為 booleanstring)、windowsVerbatimArgumentswindowsHidesignaltimeoutkillSignal 在上文中已介紹,此處不再重述。

小結

上文對 child_process 模組中主要方法的使用進行了簡短介紹,由於 execSyncexecFileSyncforkSyncspwanSync 方法是 execexecFilespwan 的同步版本,其引數並無任何差異,故不再重述。

cluster

通過 cluster 模組我們可以建立 Node.js 程序叢集,通過 Node.js 程序進群,我們可以更加充分地利用多核的優勢,將程式任務分發到不同的程序中以提高程式的執行效率;下面將通過例子為大家介紹 cluster 模組的使用:

const http = require('http');
const cluster = require('cluster');
const numCPUs = require('os').cpus().length;

if (cluster.isPrimary) {
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }
} else {
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end(`${process.pid}\n`);
  }).listen(8000);
}

上例通過 cluster.isPrimary 屬性判斷(即判斷當前程序是否為主程序)將其分為兩個部分:

  • 為真時,根據 CPU 核心的數量並通過 cluster.fork 呼叫來建立相應數量的子程序;
  • 為假時,建立一個 HTTP server,並且每個 HTTP server 都監聽同一個埠(此處為 8000)。

執行上面的例子,並在瀏覽器中存取 http://localhost:8000/,我們會發現每次存取返回的 pid 都不一樣,這說明了請求確實被分發到了各個子程序。Node.js 預設採用的負載均衡策略是輪詢排程,可通過環境變數 NODE_CLUSTER_SCHED_POLICYcluster.schedulingPolicy 屬性來修改其負載均衡策略:

NODE_CLUSTER_SCHED_POLICY = rr // 或 none

cluster.schedulingPolicy = cluster.SCHED_RR; // 或 cluster.SCHED_NONE

另外需要注意的是,雖然每個子程序都建立了 HTTP server,並都監聽了同一個埠,但並不代表由這些子程序自由競爭使用者請求,因為這樣無法保證所有子程序的負載達到均衡。所以正確的流程應該是由主程序監聽埠,然後將使用者請求根據分發策略轉發到具體的子程序進行處理。

由於程序之間是相互隔離的,因此程序之間一般通過共用記憶體訊息傳遞管道等機制進行通訊。Node.js 則是通過訊息傳遞來完成父子程序之間的通訊,比如下面的例子:

const http = require('http');
const cluster = require('cluster');
const numCPUs = require('os').cpus().length;

if (cluster.isPrimary) {
  for (let i = 0; i < numCPUs; i++) {
    const worker = cluster.fork();
    worker.on('message', (message) => {
      console.log(`I am primary(${process.pid}), I got message from worker: "${message}"`);
      worker.send(`Send message to worker`)
    });
  }
} else {
  process.on('message', (message) => {
    console.log(`I am worker(${process.pid}), I got message from primary: "${message}"`)
  });
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end(`${process.pid}\n`);
    process.send('Send message to primary');
  }).listen(8000);
}

執行上面的例子,並存取 http://localhost:8000/,再檢視終端,我們會看到類似下面的輸出:

I am primary(44460), I got message from worker: "Send message to primary"
I am worker(44461), I got message from primary: "Send message to worker"
I am primary(44460), I got message from worker: "Send message to primary"
I am worker(44462), I got message from primary: "Send message to worker"

利用該機制,我們可以監聽各子程序的狀態,以便在某個子程序出現意外後,能夠及時對其進行干預,以保證服務的可用性。

cluster 模組的介面非常簡單,為了節省篇幅,這裡只對 cluster.setupPrimary 方法做一些特別宣告,其它方法請檢視官方檔案

  • cluster.setupPrimary 呼叫後,相關設定將同步到在 cluster.settings 屬性中,並且每次呼叫都基於當前 cluster.settings 屬性的值;
  • cluster.setupPrimary 呼叫後,對已執行的子程序沒有影響,隻影響後續的 cluster.fork 呼叫;
  • cluster.setupPrimary 呼叫後,不影響後續傳遞給 cluster.fork 呼叫的 env 引數;
  • cluster.setupPrimary 只能在主程序中使用。

worker_threads

前文我們對 cluster 模組進行了介紹,通過它我們可以建立 Node.js 程序叢集以提高程式的執行效率,但 cluster 基於多程序模型,程序間高成本的切換以及程序間資源的隔離,會隨著子程序數量的增加,很容易導致因系統資源緊張而無法響應的問題。為解決此類問題,Node.js 提供了 worker_threads,下面我們通過具體的例子對該模組的使用進行簡單介紹:

// server.js
const http = require('http');
const { Worker } = require('worker_threads');

http.createServer((req, res) => {
  const httpWorker = new Worker('./http_worker.js');
  httpWorker.on('message', (result) => {
    res.writeHead(200);
    res.end(`${result}\n`);
  });
  httpWorker.postMessage('Tom');
}).listen(8000);

// http_worker.js
const { parentPort } = require('worker_threads');

parentPort.on('message', (name) => {
  parentPort.postMessage(`Welcone ${name}!`);
});

上例展示了 worker_threads 的簡單使用,在使用 worker_threads 的過程中,需要注意以下幾點:

  • 通過 worker_threads.Worker 建立 Worker 範例,其中 Worker 指令碼既可以為一個獨立的 JavaScript 檔案,也可以為字串,比如上例可修改為:

    const code = "const { parentPort } = require('worker_threads'); parentPort.on('message', (name) => {parentPort.postMessage(`Welcone ${name}!`);})";
    const httpWorker = new Worker(code, { eval: true });
  • 通過 worker_threads.Worker 建立 Worker 範例時,可以通過指定 workerData 的值來設定 Worker 子執行緒的初始後設資料,比如:

    // server.js
    const { Worker } = require('worker_threads');
    const httpWorker = new Worker('./http_worker.js', { workerData: { name: 'Tom'} });
    
    // http_worker.js
    const { workerData } = require('worker_threads');
    console.log(workerData);
  • 通過 worker_threads.Worker 建立 Worker 範例時,可通過設定 SHARE_ENV 以實現在 Worker 子執行緒與主執行緒之間共用環境變數的需求,比如:

    const { Worker, SHARE_ENV } = require('worker_threads');
    const worker = new Worker('process.env.SET_IN_WORKER = "foo"', { eval: true, env: SHARE_ENV });
    worker.on('exit', () => {
      console.log(process.env.SET_IN_WORKER);
    });
  • 不同於 cluster 中程序間的通訊機制,worker_threads 採用的 MessageChannel 來進行執行緒間的通訊:

    • Worker 子執行緒通過 parentPort.postMessage 方法傳送訊息給主執行緒,並通過監聽 parentPortmessage 事件來處理來自主執行緒的訊息;
    • 主執行緒通過 Worker 子執行緒範例(此處為 httpWorker,以下均以此代替 Worker 子執行緒)的 postMessage 方法傳送訊息給 httpWorker,並通過監聽 httpWorkermessage 事件來處理來自 Worker 子執行緒的訊息。

在 Node.js 中,無論是 cluster 建立的子程序,還是 worker_threads 建立的 Worker 子執行緒,它們都擁有屬於自己的 V8 範例以及事件迴圈,所不同的是:

  • 子程序之間的記憶體空間是互相隔離的,而 Worker 子執行緒共用所屬程序的記憶體空間;
  • 子程序之間的切換成本要遠遠高於 Worker 子執行緒之間的切換成本。

儘管看起來 Worker 子執行緒比子程序更高效,但 Worker 子執行緒也有不足的地方,即cluster 提供了負載均衡,而 worker_threads 則需要我們自行完成負載均衡的設計與實現。

總結

本文介紹了 Node.js 中 child_processclusterworker_threads 三個模組的使用,通過這三個模組,我們可以充分利用 CPU 多核的優勢,並以多進(線)程的模式來高效地解決一些特殊任務(比如 AI、圖片處理等)的執行效率。每個模組都有其適用的場景,文中僅對其基本使用進行了說明,如何結合自己的問題進行高效地運用,還需要大家自行摸索。最後,本文若有紕漏之處,還望大家能夠指正,祝大家快樂編碼每一天。

更多node相關知識,請存取:!

以上就是聊聊Node.js中的多程序和多執行緒的詳細內容,更多請關注TW511.COM其它相關文章!