聊聊使用Node如何實現輕量化程序池和執行緒池

2022-10-14 22:01:18

node.js極速入門課程:進入學習

I. 前言

本文論點主要面向 Node.js 開發語言

>> Show Me Code,目前程式碼正在 dev 分支,已完成單元測試,尚待測試所有場景。

>> 建議通讀 Node.js 官方檔案 -【不要阻塞事件迴圈】

Node.js 即伺服器端 Javascript,得益於宿主環境的不同,它擁有比在瀏覽器上更多的能力。比如:完整的檔案系統存取許可權、網路協定、通訊端程式設計、程序和執行緒操作、C++ 外掛原始碼級的支援、Buffer 二進位制、Crypto 加密套件的天然支援。【相關教學推薦:】

Node.js 的是一門單執行緒的語言,它基於 V8 引擎開發,v8 在設計之初是在瀏覽器端對 JavaScript 語言的解析執行引擎,其最大的特點是單執行緒,這樣的設計避免了一些多執行緒狀態同步問題,使得其更輕量化易上手。

一、名詞定義

1. 程序

學術上說,程序是一個具有一定獨立功能的程式在一個資料集上的一次動態執行的過程,是作業系統進行資源分配和排程的一個獨立單位,是應用程式執行的載體。我們這裡將程序比喻為工廠的車間,它代表 CPU 所能處理的單個任務。任一時刻,CPU 總是執行一個程序,其他程序處於非執行狀態。

程序具有以下特性:

  • 程序是擁有資源的基本單位,資源分配給程序,同一程序的所有執行緒共用該程序的所有資源;
  • 程序之間可以並行執行;
  • 在建立或撤消程序時,系統都要為之分配和回收資源,與執行緒相比系統開銷較大;
  • 一個程序可以有多個執行緒,但至少有一個執行緒;

2. 執行緒

在早期的作業系統中並沒有執行緒的概念,程序是能擁有資源和獨立執行的最小單位,也是程式執行的最小單位。任務排程採用的是時間片輪轉的搶佔式排程方式,而程序是任務排程的最小單位,每個程序有各自獨立的一塊記憶體,使得各個程序之間記憶體地址相互隔離。

後來,隨著計算機的發展,對 CPU 的要求越來越高,程序之間的切換開銷較大,已經無法滿足越來越複雜的程式的要求了。於是就發明了執行緒,執行緒是程式執行中一個單一的順序控制流程,是程式執行流的最小單元。這裡把執行緒比喻一個車間的工人,即一個車間可以允許由多個工人協同完成一個任務,即一個程序中可能包含多個執行緒。

執行緒具有以下特性:

  • 執行緒作為排程和分配的基本單位;
  • 多個執行緒之間也可並行執行;
  • 執行緒是真正用來執行程式的,執行計算的;
  • 執行緒不擁有系統資源,但可以存取隸屬於程序的資源,一個執行緒只能屬於一個程序;

Node.js 的多程序有助於充分利用 CPU 等資源,Node.js 的多執行緒提升了單程序上任務的並行處理能力。

在 Node.js 中,每個 worker 執行緒都有他自己的 V8 範例和事件迴圈機制 (Event Loop)。但是,和程序不同,workers 之間是可以共用記憶體的。

二、Node.js 非同步機制

1. Node.js 內部執行緒池、非同步機制以及宏任務優先順序劃分

Node.js 的單執行緒是指程式的主要執行執行緒是單執行緒,這個主執行緒同時也負責事件迴圈。而其實語言內部也會建立執行緒池來處理主執行緒程式的 網路 IO / 檔案 IO / 定時器 等呼叫產生的非同步任務。一個例子就是定時器 Timer 的實現:在 Node.js 中使用定時器時,Node.js 會開啟一個定時器執行緒進行計時,計時結束時,定時器回撥函數會被放入位於主執行緒的宏任務佇列。當事件迴圈系統執行完主執行緒同步程式碼和當前階段的所有微任務時,該回撥任務最後再被取出執行。所以 Node.js 的定時器其實是不準確的,只能保證在預計時間時我們的回撥任務被放入佇列等待執行,而不是直接被執行。

1.png

多執行緒機制配合 Node.js 的 evet loop 事件迴圈系統讓開發者在一個執行緒內就能夠使用非同步機制,包括定時器、IO、網路請求。但為了實現高響應度的高效能伺服器,Node.js 的 Event Loop 在宏任務上進一步劃分了優先順序。

2.png

Node.js 宏任務之間的優先順序劃分:Timers > Pending > Poll > Check > Close。

  • Timers Callback: 涉及到時間,肯定越早執行越準確,所以這個優先順序最高很容易理解。
  • Pending Callback:處理網路、IO 等異常時的回撥,有的 unix 系統會等待發生錯誤的上報,所以得處理下。
  • Poll Callback:處理 IO 的 data,網路的 connection,伺服器主要處理的就是這個。
  • Check Callback:執行 setImmediate 的回撥,特點是剛執行完 IO 之後就能回撥這個。
  • Close Callback:關閉資源的回撥,晚點執行影響也不到,優先順序最低。

Node.js 微任務之間的優化及劃分:process.nextTick > Promise。

2. Node.js 宏任務和微任務的執行時機

node 11 之前,Node.js 的 Event Loop 並不是瀏覽器那種一次執行一個宏任務,然後執行所有的微任務,而是執行完一定數量的 Timers 宏任務,再去執行所有微任務,然後再執行一定數量的 Pending 的宏任務,然後再去執行所有微任務,剩餘的 Poll、Check、Close 的宏任務也是這樣。node 11 之後改為了每個宏任務都執行所有微任務了。

而 Node.js 的 宏任務之間也是有優先順序的,如果 Node.js 的 Event Loop 每次都是把當前優先順序的所有宏任務跑完再去跑下一個優先順序的宏任務,那麼會導致 「飢餓」 狀態的發生。如果某個階段宏任務太多,下個階段就一直執行不到了,所以每個型別的宏任務有個執行數量上限的機制,剩餘的交給之後的 Event Loop 再繼續執行。

最終表現就是:也就是執行一定數量的 Timers 宏任務,每個宏任務之間執行所有微任務,再一定數量的 Pending Callback 宏任務,每個宏任務之間再執行所有微任務。

三、Node.js 的多程序

1. 使用 child_process 方式手動建立程序

Node.js 程式通過 child_process 模組提供了衍生子程序的能力,child_process 提供多種子程序的建立方式:

  • spawn 建立新程序,執行結果以流的形式返回,只能通過事件來獲取結果資料,操作麻煩。
const spawn = require('child_process').spawn;
const ls = spawn('ls', ['-lh', '/usr']);

ls.stdout.on('data', (data) => {
  console.log(`stdout: ${data}`);
});

ls.stderr.on('data', (data) => {
  console.log(`stderr: ${data}`);
});

ls.on('close', (code) => {
  console.log(`child process exited with code ${code}`);
});
登入後複製
  • execFile 建立新程序,按照其後面的 File 名字,執行一個可執行檔案,可以帶選項,以回撥形式返回撥用結果,可以得到完整資料,方便了很多。
execFile('/path/to/node', ['--version'], function(error, stdout, stderr){
    if(error){
        throw error;
    }
    console.log(stdout);
});
登入後複製
  • exec 建立新程序,可以直接執行 shell 命令,簡化了 shell 命令執行方式,執行結果以回撥方式返回。
exec('ls -al', function(error, stdout, stderr){
    if(error) {
        console.error('error:' + error);
        return;
    }
    console.log('stdout:' + stdout);
    console.log('stderr:' + typeof stderr);
});
登入後複製
  • fork 建立新程序,執行 node 程式,程序擁有完整的 V8 範例,建立後自動開啟主程序到子程序的 IPC 通訊,資源佔用最多。
var child = child_process.fork('./anotherSilentChild.js', {
    silent: true
});

child.stdout.setEncoding('utf8');
child.stdout.on('data', function(data){
    console.log(data);
});
登入後複製

其中,spawn 是所有方法的基礎,exec 底層是呼叫了 execFile。

2. 使用 cluster 方式半自動建立程序

以下是使用 Cluster 模組建立一個 http 服務叢集的簡單範例。範例中建立 Cluster 時使用同一個 Js 執行檔案,在檔案內使用 cluster.isPrimary 判斷當前執行環境是在主程序還是子程序,如果是主程序則使用當前執行檔案建立子程序範例,如果時子程序則進入子程序的業務處理流程。

/*
  簡單範例:使用同一個 JS 執行檔案建立子程序叢集 Cluster
*/
const cluster = require('node:cluster');
const http = require('node:http');
const numCPUs = require('node:os').cpus().length;
const process = require('node:process');

if (cluster.isPrimary) {
  console.log(`Primary ${process.pid} is running`);
  // Fork workers.
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }
  cluster.on('exit', (worker, code, signal) => {
    console.log(`worker ${worker.process.pid} died`);
  });
} else {
  // Workers can share any TCP connection
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end('hello world\n');
  }).listen(8000);
  console.log(`Worker ${process.pid} started`);
}
登入後複製

Cluster 模組允許設立一個主程序和若干個子程序,使用 child_process.fork() 在內部隱式建立子程序,由主程序監控和協調子程序的執行。

子程序之間採用程序間通訊交換訊息,Cluster 模組內建一個負載均衡器,採用 Round-robin 演演算法(輪流執行)協調各個子程序之間的負載。執行時,所有新建立的連線都由主程序完成,然後主程序再把 TCP 連線分配給指定的子程序。

使用叢集建立的子程序可以使用同一個埠,Node.js 內部對 http/net 內建模組進行了特殊支援。Node.js 主程序負責監聽目標埠,收到請求後根據負載均衡策略將請求分發給某一個子程序。

3. 使用基於 Cluster 封裝的 PM2 工具全自動建立程序

PM2 是常用的 node 程序管理工具,它可以提供 node.js 應用管理能力,如自動過載、效能監控、負載均衡等。

其主要用於 獨立應用 的程序化管理,在 Node.js 單機服務部署方面比較適合。可以用於生產環境下啟動同個應用的多個範例提高 CPU 利用率、抗風險、熱載入等能力。

由於是外部庫,需要使用 npm 包管理器安裝:

$: npm install -g pm2
登入後複製

pm2 支援直接執行 server.js 啟動專案,如下:

$: pm2 start server.js
登入後複製

即可啟動 Node.js 應用,成功後會看到列印的資訊:

┌──────────┬────┬─────────┬──────┬───────┬────────┬─────────┬────────┬─────┬───────────┬───────┬──────────┐
│ App name │ id │ version │ mode │ pid   │ status │ restart │ uptime │ cpu │ mem       │ user  │ watching │
├──────────┼────┼─────────┼──────┼───────┼────────┼─────────┼────────┼─────┼───────────┼───────┼──────────┤
│ server   │ 0  │ 1.0.0   │ fork │ 24776 │ online │ 9       │ 19m    │ 0%  │ 35.4 MB   │ 23101 │ disabled │
└──────────┴────┴─────────┴──────┴───────┴────────┴─────────┴────────┴─────┴───────────┴───────┴──────────┘
登入後複製

pm2 也支援組態檔啟動,通過組態檔 ecosystem.config.js 可以客製化 pm2 的各項引數:

module.exports = {
  apps : [{
    name: 'API', // 應用名
    script: 'app.js', // 啟動指令碼
    args: 'one two', // 命令列引數
    instances: 1, // 啟動範例數量
    autorestart: true, // 自動重新啟動
    watch: false, // 檔案更改監聽器
    max_memory_restart: '1G', // 最大記憶體使用亮
    env: { // development 預設環境變數
      // pm2 start ecosystem.config.js --watch --env development
      NODE_ENV: 'development'
    },
    env_production: { // production 自定義環境變數
      NODE_ENV: 'production'
    }
  }],

  deploy : {
    production : {
      user : 'node',
      host : '212.83.163.1',
      ref  : 'origin/master',
      repo : '[email protected]:repo.git',
      path : '/var/www/production',
      'post-deploy' : 'npm install && pm2 reload ecosystem.config.js --env production'
    }
  }
};
登入後複製

pm2 logs 紀錄檔功能也十分強大:

$: pm2 logs
登入後複製

II. Node.js 中程序池和執行緒池的適用場景

一般我們使用計算機執行的任務包含以下幾種型別的任務:

  • 計算密集型任務:任務包含大量計算,CPU 佔用率高。

    const matrix = {};
    for (let i = 0; i < 10000; i++) {
      for (let j = 0; j < 10000; j++) {
        matrix[`${i}${j}`] = i * j;
      }
    }
    登入後複製
  • IO 密集型任務:任務包含頻繁的、持續的網路 IO 和磁碟 IO 的呼叫。

    const {copyFileSync, constants} = require('fs');
    copyFileSync('big-file.zip', 'destination.zip');
    登入後複製
  • 混合型任務:既有計算也有 IO。

一、程序池的適用場景

使用程序池的最大意義在於充分利用多核 CPU 資源,同時減少子程序建立和銷燬的資源消耗

程序是作業系統分配資源的基本單位,使用多程序架構能夠更多的獲取 CPU 時間、記憶體等資源。為了應對 CPU-Sensitive 場景,以及充分發揮 CPU 多核效能,Node 提供了 child_process 模組用於建立子程序。

子程序的建立和銷燬需要較大的資源成本,因此池化子程序的建立和銷燬過程,利用程序池來管理所有子程序。

除了這一點,Node.js 中子程序也是唯一的執行二進位制檔案的方式,Node.js 可通過流 (stdin/stdout/stderr) 或 IPC 和子程序通訊。

通過 Stream 通訊

const {spawn} = require('child_process');
const ls = spawn('ls', ['-lh', '/usr']);

ls.stdout.on('data', (data) => {
  console.log(`stdout: ${data}`);
});

ls.stderr.on('data', (data) => {
  console.error(`stderr: ${data}`);
});

ls.on('close', (code) => {
  console.log(`child process exited with code ${code}`);
});
登入後複製

通過 IPC 通訊

const cp = require('child_process');
const n = cp.fork(`${__dirname}/sub.js`);

n.on('message', (m) => {
  console.log('PARENT got message:', m);
});

n.send({hello: 'world'});
登入後複製

二、執行緒池的適用場景

使用執行緒池的最大意義在於多工並行,為主執行緒降壓,同時減少執行緒建立和銷燬的資源消耗。單個 CPU 密集性的計算任務使用執行緒執行並不會更快,甚至執行緒的建立、銷燬、上下文切換、執行緒通訊、資料序列化等操作還會額外增加資源消耗。

但是如果一個計算機程式中有很多同一型別的阻塞任務需要執行,那麼將他們交給執行緒池可以成倍的減少任務總的執行時間,因為在同一時刻多個執行緒在並行進行計算。如果多個任務只使用主執行緒執行,那麼最終消耗的時間是線性疊加的,同時主執行緒阻塞之後也會影響其它任務的處理。

特別是對 Node.js 這種單主執行緒的語言來講,主執行緒如果消耗了過多的時間來執行這些耗時任務,那麼對整個 Node.js 單個程序範例的效能影響將是致命的。這些佔用著 CPU 時間的操作將導致其它任務獲取的 CPU 時間不足或 CPU 響應不夠及時,被影響的任務將進入 「飢餓」 狀態。

因此 Node.js 啟動後主執行緒應儘量承擔排程的角色,批次重型 CPU 佔用任務的執行應交由額外的工作執行緒處理,主執行緒最後拿到工作執行緒的執行結果再返回給任務呼叫方。另一方面由於 IO 操作 Node.js 內部作了優化和支援,因此 IO 操作應該直接交給主執行緒,主執行緒再使用內部執行緒池處理。

Node.js 的非同步能不能解決過多佔用 CPU 任務的執行問題?

答案是:不能,過多的非同步 CPU 佔用任務會阻塞事件迴圈。

Node.js 的非同步在 網路 IO / 磁碟 IO 處理上很有用,宏任務微任務系統 + 內部執行緒呼叫能分擔主程序的執行壓力。但是如果單獨將 CPU 佔用任務放入宏任務佇列或微任務佇列,對任務的執行速度提升沒有任何幫助,只是一種任務排程方式的優化而已。

我們只是延遲了任務的執行或是將巨大任務分散成多個再分批執行,但是任務最終還是要在主執行緒被執行。如果這類任務過多,那麼任務分片和延遲的效果將完全消失,一個任務可以,那十個一百個呢?量變將會引起質變。

以下是 Node.js 官方部落格中的原文:

「如果你需要做更復雜的任務,拆分可能也不是一個好選項。這是因為拆分之後任務仍然在事件迴圈執行緒中執行,並且你無法利用機器的多核硬體能力。 請記住,事件迴圈執行緒只負責協呼叫戶端的請求,而不是獨自執行完所有任務。 對一個複雜的任務,最好把它從事件迴圈執行緒轉移到工作執行緒池上。」

  • 場景:間歇性讓主程序 癱瘓

每一秒鐘,主執行緒有一半時間被佔用

// this task costs 100ms
function doHeavyTask() { ...}

setInterval(() => {
  doHeavyTask(); // 100ms
  doHeavyTask(); // 200ms
  doHeavyTask(); // 300ms
  doHeavyTask(); // 400ms
  doHeavyTask(); // 500ms
}, 1e3);
登入後複製
  • 場景:高頻性讓主程序 半癱瘓

每 200ms,主執行緒有一半時間被佔用

// this task costs 100ms
function doHeavyTask() { ...}

setInterval(() => {
  doHeavyTask();
}, 1e3);

setInterval(() => {
  doHeavyTask();
}, 1.2e3);

setInterval(() => {
  doHeavyTask();
}, 1.4e3);

setInterval(() => {
  doHeavyTask();
}, 1.6e3);

setInterval(() => {
  doHeavyTask();
}, 1.8e3);
登入後複製

以下是官方部落格的原文摘錄:

「因此,你應該保證永遠不要阻塞事件輪詢執行緒。換句話說,每個 JavaScript 回撥應該快速完成。這些當然對於 await,Promise.then 也同樣適用。」

III. 程序池

程序池是對程序的建立、執行任務、銷燬等流程進行管控的一個應用或是一套程式邏輯。之所以稱之為池是因為其內部包含多個程序範例,程序範例隨時都在程序池內進行著狀態流轉,多個建立的範例可以被重複利用,而不是每次執行完一系列任務後就被銷燬。因此,程序池的部分存在目的是為了減少程序建立的資源消耗。

此外程序池最重要的一個作用就是負責將任務分發給各個程序執行,各個程序的任務執行優先順序取決於程序池上的負載均衡運算,由演演算法決定應該將當前任務派發給哪個程序,以達到最高的 CPU 和記憶體利用率。常見的負載均衡演演算法有:

  • POLLING - 輪詢:子程序輪流處理請求
  • WEIGHTS - 權重:子程序根據設定的權重來處理請求
  • RANDOM - 隨機:子程序隨機處理請求
  • SPECIFY - 指定:子程序根據指定的程序 id 處理請求
  • WEIGHTS_POLLING - 權重輪詢:權重輪詢策略與輪詢策略類似,但是權重輪詢策略會根據權重來計運算元程序的輪詢次數,從而穩定每個子程序的平均處理請求數量。
  • WEIGHTS_RANDOM - 權重隨機:權重隨機策略與隨機策略類似,但是權重隨機策略會根據權重來計運算元程序的隨機次數,從而穩定每個子程序的平均處理請求數量。
  • MINIMUM_CONNECTION - 最小連線數:選擇子程序上具有最小連線活動數量的子程序處理請求。
  • WEIGHTS_MINIMUM_CONNECTION - 權重最小連線數:權重最小連線數策略與最小連線數策略類似,不過各個子程序被選中的概率由連線數和權重共同決定。

一、要點

「 對單一任務的控制不重要,對單個程序宏觀的資源佔用更需關注 」

二、流程設計

程序池架構圖參考之前的程序管理工具開發相關 文章,本文只需關注程序池部分。

3.png

1. 關鍵流程

  • 程序池建立程序時會初始化程序範例內的 ProcessHost 事務物件,程序範例向事務物件註冊多種任務監聽器。
  • 使用者向程序池發起單個任務呼叫請求,可傳入程序繫結的 ID 和指定的任務名。
  • 判斷使用者是否傳入 ID 引數指定使用某個程序執行任務,如果未指定 ID:
    • 程序池判斷當前程序池程序數量是否已超過最大值,如果未超過則建立新程序,用此程序處理當前任務,並將程序放入程序池。
    • 如果程序池程序數量已達最大值,則根據負載均衡演演算法選擇一個程序處理當前任務。
  • 指定 ID 時:
    • 通過使用者傳入的 ID 引數找到對應程序,將任務分發給此程序執行。
    • 如果未找到 ID 所對應的程序,則向使用者丟擲異常。
  • 任務由程序池派發給目標程序後,ProcessHost 事務物件會根據該任務的任務名觸發子程序內的監聽器。
  • 子程序內的監聽器函數可執行同步任務和非同步任務,非同步任務返回 Promise 物件,同步任務返回值。
  • ProcessHost 事務物件的監聽器函數執行完畢後,會將任務結果返回給程序池,程序池再將結果通過非同步回撥函數返回給使用者。
  • 使用者也可向程序池所有子程序發起個任務呼叫請求,最終將會通過 Promise 的返回所有子程序的任務執行結果。

2. 名詞解釋

  • ProcessHost 事務中心:執行在子程序中,用於事件觸發以及和主程序通訊。開發者在子程序執行檔案中向其註冊多個具有特定任務名的任務事件,主程序會向某個子程序傳送任務請求,並由事務中心呼叫指定的事件監聽器處理請求。
  • LoadBalancer 負載均衡器:用於選擇一個程序處理任務,可根據不同的負載均衡演演算法實現不同的選擇策略。
  • LifeCycle: 設計之初用於管控子程序的智慧啟停,某個程序在長時間未被使用時進入休眠狀態,當有新任務到來時再喚醒程序。目前還有些難點需要解決,比如程序的喚醒和休眠不好實現,程序的使用情況不好統計,該功能暫時不可用。

三、程序池使用方式

更多範例見:程序池 mocha 單元測試

1. 建立程序池

main.js

const { ChildProcessPool, LoadBalancer } = require('electron-re');

const processPool = new ChildProcessPool({
  path: path.join(__dirname, 'child_process/child.js'),
  max: 4,
  strategy: LoadBalancer.ALGORITHM.POLLING,
);
登入後複製

child.js

const { ProcessHost } = require('electron-re');

ProcessHost
  .registry('test1', (params) => {
    console.log('test1');
    return 1 + 1;
  })
  .registry('test2', (params) => {
    console.log('test2');
    return new Promise((resolve) => resolve(true));
  });
登入後複製

2. 向一個子程序傳送任務請求

processPool.send('test1', { value: "test1"}).then((result) => {
  console.log(result);
});
登入後複製

3. 向所有子程序傳送任務請求

processPool.sendToAll('test1', { value: "test1"}).then((results) => {
  console.log(results);
});
登入後複製

四、程序池實際使用場景

1. Electron 網頁代理工具中多程序的應用

1)基本代理原理:

4.png

2)單程序下使用者端執行原理:

  • 通過使用者預先儲存的伺服器設定資訊,使用 node.js 子程序來啟動 ss-local 可執行檔案建立和 ss 伺服器的連線來代理使用者本地電腦的流量,每個子程序佔用一個 socket 埠。
  • 其它支援 socks5 代理的 proxy 工具比如:瀏覽器上的 SwitchOmega 外掛會和這個埠的 tcp 服務建立連線,將 tcp 流量加密後通過代理伺服器轉發給我們需要存取的目標伺服器。

5.png

3)多程序下使用者端執行原理:

以上描述的是使用者端連線單個節點的工作模式,節點訂閱組中的負載均衡模式需要同時啟動多個子程序,每個子程序啟動 ss-local 執行檔案佔用一個本地埠並連線到遠端一個伺服器節點。

每個子程序啟動時選擇的埠是會變化的,因為某些埠可能已經被系統佔用,程式需要先選擇未被使用的埠。並且瀏覽器 proxy 工具也不可能同時連線到我們本地啟動的子程序上的多個 ss-local 服務上。因此需要一個佔用固定埠的中間節點接收 proxy 工具發出的連線請求,然後按照某種分發規則將 tcp 流量轉發到各個子程序的 ss-local 服務的埠上。

6.png

2. 多程序檔案分片上傳 Electron 使用者端

之前做過一個支援 SMB 協定多檔案分片上傳的使用者端,Node.js 端的上傳任務管理、IO 操作等都使用多程序實現過一版本,不過是在 gitlab 實驗分支自己搞得(逃)。

7.png

IV. 執行緒池

為了減小 CPU 密集型任務計算的系統開銷,Node.js 引入了新的特性:工作執行緒 worker_threads,其首次在 v10.5.0 作為實驗性功能出現。通過 worker_threads 可以在程序內建立多個執行緒,主執行緒與 worker 執行緒使用 parentPort 通訊,worker 執行緒之間可通過 MessageChannel 直接通訊。worker_threads 做為開發者使用執行緒的重要特性,在 v12.11.0 穩定版已經能正常在生產環境使用了。

但是執行緒的建立需要額外的 CPU 和記憶體資源,如果要多次使用一個執行緒的話,應該將其儲存起來,當該執行緒完全不使用時需要及時關閉以減少記憶體佔用。想象我們在需要使用執行緒時直接建立,使用完後立刻銷燬,可能執行緒自身的建立和銷燬成本已經超過了使用執行緒本身節省下的資源成本。Node.js 內部雖然有使用執行緒池,但是對於開發者而言是完全透明不可見的,因此封裝一個能夠維護執行緒生命週期的執行緒池工具的重要性就體現了。

為了強化多非同步任務的排程,執行緒池除了提供維護執行緒的能力,也提供維護任務佇列的能力。當傳送請求給執行緒池讓其執行一個非同步任務時,如果執行緒池內沒有空閒執行緒,那該任務就會被直接丟棄了,顯然這不是想要的效果。

因此可以考慮為執行緒池新增一個任務佇列的排程邏輯:當執行緒池沒有空閒執行緒時,將該任務放入待執行任務佇列 (FIFO),執行緒池在某個時機取出任務交由某個空閒執行緒執行,執行完成後觸發非同步回撥函數,將執行結果返回給請求呼叫方。但是執行緒池的任務佇列內的任務數量應該考慮限制到一個特殊值,防止執行緒池負載過大影響 Node.js 應用整體執行效能。

一、要點

「 對單一任務的控制重要,對單個執行緒的資源佔用無需關注 」

二、詳細設計

8.png

任務流轉過程

  • 呼叫者可通過 StaticPool/StaticExcutor/DynamicPool/DynamicExcutor 範例向執行緒池派發任務(以下有關鍵名詞說明),各種範例的之間最大的不同點就是引數動態化能力。

  • 任務由執行緒池內部生成,生成後任務做為主要的流轉載體,一方面承載使用者傳入的任務計算引數,另一方面記錄任務流轉過程中的狀態變化,比如:任務狀態、開始時間、結束時間、任務 ID、任務重試次數、任務是否支援重試、任務型別等。

  • 任務生成後,首先判斷當前執行緒池的執行緒數是否已達上限,如果未達上限,則新建執行緒並將其放入執行緒儲存區,然後使用該執行緒直接執行當前任務。

  • 如果執行緒池執行緒數超限,則判斷是否有未執行任務的空閒執行緒,拿到空閒執行緒後,使用該執行緒直接執行當前任務。

  • 如果沒有空閒執行緒,則判斷當前等待任務佇列是否已滿,任務佇列已滿則丟擲錯誤,第一時間讓呼叫者感知任務未執行成功。

  • 如果任務佇列未滿的話,將該任務放入任務佇列,等待任務迴圈系統取出將其執行。

  • 以上 4/5/6 步的三種情況下任務執行後,判斷該任務是否執行成功,成功時觸發成功的回撥函數,Promise 狀態為 fullfilled。如果失敗,則判斷是否支援重試,支援重試的情況下,將該任務重試次數 + 1 後重新放入任務佇列尾部。任務不支援重試的情況下,直接失敗,並觸發失敗的非同步回撥函數,Promise 狀態為 rejected。

  • 整個執行緒池生命週期中,存在一個任務迴圈系統,以一定的週期頻率從任務佇列首部獲取任務,並從執行緒儲存區域獲取空閒執行緒後使用該執行緒執行任務,該流程也符合第 7 步的描述。

  • 任務迴圈系統除了取任務執行,如果執行緒池設定了任務超時時間的話,也會判斷正在執行中的任務是否超時,超時後會終止該執行緒的所有執行中的程式碼。

模組說明

  • StaticPool
    • 定義:靜態執行緒池,可使用固定的 execFunction/execString/execFile 執行引數來啟動工作執行緒,執行引數在程序池建立後不能更改。
    • 程序池建立之後除了執行引數不可變外,其它引數比如:任務超時時間、任務重試次數、執行緒池任務輪詢間隔時間、最大任務數、最大執行緒數、是否懶建立執行緒等都可以通過 API 隨時更改。
  • StaticExcutor
    • 定義:靜態執行緒池的執行器範例,繼承所屬執行緒池的固定執行引數 execFunction/execString/execFile 且不可更改。
    • 執行器範例建立之後除了執行引數不可變外,其它引數比如:任務超時時間、任務重試次數、transferList 等都可以通過 API 隨時更改。
    • 靜態執行緒池的各個執行器範例的引數設定互不影響,引數預設繼承於所屬執行緒池,引數在執行器上更改後具有比所屬執行緒池同名引數更高的優先順序。
  • DynamicPool
    • 定義:動態執行緒池,無需使用 execFunction/execString/execFile 執行引數即可建立執行緒池。執行引數在呼叫 exec() 方法時動態傳入,因此執行引數可能不固定。
    • 執行緒池建立之後執行引數預設為 null,其它引數比如:任務超時時間、任務重試次數、transferList 等都可以通過 API 隨時更改。
  • DynamicExcutor
    • 定義:動態執行緒池的執行器範例,繼承所屬執行緒池的其它引數,執行引數為 null
    • 執行器範例建立之後,其它引數比如:任務超時時間、任務重試次數、transferList 等都可以通過 API 隨時更改。
    • 動態執行緒池的各個執行器範例的引數設定互不影響,引數預設繼承於所屬執行緒池,引數在執行器上更改後具有比所屬執行緒池同名引數更高的優先順序。
    • 動態執行器範例在執行任務之前需要先設定執行引數 execFunction/execString/execFile,執行引數可以隨時改變。
  • ThreadGenerator
    • 定義:執行緒建立的工廠方法,會進行引數校驗。
  • Thread
    • 定義:執行緒範例,內部簡單封裝了 worker_threads API。
  • TaskGenerator
    • 定義:任務建立的工廠方法,會進行引數校驗。
  • Task
    • 定義:單個任務,記錄了任務執行狀態、任務開始結束時間、任務重試次數、任務攜帶引數等。
  • TaskQueue
    • 定義:任務佇列,在陣列中存放任務,以先入先出方式 (FIFO) 向執行緒池提供任務,使用 Map 來儲存 taskId 和 task 之間的對映關係。
  • Task Loop
    • 任務迴圈,每個迴圈的預設時間間隔為 2S,每次迴圈中會處理超時任務、將新任務派發給空閒執行緒等。

三、執行緒池使用方式

更多範例見:執行緒池 mocha 單元測試

1. 建立靜態執行緒池

main.js

const { StaticThreadPool } = require(`electron-re`);
const threadPool = new StaticThreadPool({
  execPath: path.join(__dirname, './worker_threads/worker.js'),
  lazyLoad: true, // 懶載入
  maxThreads: 24, // 最大執行緒數
  maxTasks: 48, // 最大任務數
  taskRetry: 1, // 任務重試次數
  taskLoopTime: 1e3, // 任務輪詢時間
});
const executor = threadPool.createExecutor();
登入後複製

worker.js

const fibonaccis = (n) => {
  if (n < 2) {
    return n;
  }
  return fibonaccis(n - 1) + fibonaccis(n - 2);
};

module.exports = (value) => {
  return fibonaccis(value);
}
登入後複製

2. 使用靜態執行緒池傳送任務請求

threadPool.exec(15).then((res) => {
  console.log(+res.data === 610)
});

executor
  .setTaskRetry(2) // 不影響 pool 的全域性設定
  .setTaskTimeout(2e3) // 不影響 pool 的全域性設定
  .exec(15).then((res) => {
    console.log(+res.data === 610)
  });
登入後複製

3. 動態執行緒池和動態執行器

const { DynamicThreadPool } = require(`electron-re`);
const threadPool = new DynamicThreadPool({
  maxThreads: 24, // 最大執行緒數
  maxTasks: 48, // 最大任務數
  taskRetry: 1, // 任務重試次數
});
const executor = threadPool.createExecutor({
  execFunction: (value) => { return 'dynamic:' + value; },
});

threadPool.exec('test', {
  execString: `module.exports = (value) => { return 'dynamic:' + value; };`,
});
executor.exec('test');
executor
  .setExecPath('/path/to/exec-file.js')
  .exec('test');
登入後複製

四、執行緒池實際使用場景

暫未在專案中實際使用,可考慮在前端圖片畫素處理、音視訊轉碼處理等 CPU 密集性任務中進行實踐。

這裡有篇文章寫了 web_worker 的一些應用場景,web_worker 和 worker_threads 是類似的,宿主環境不同,一些許可權和能力的不同而已。

V. 結尾

最開始 專案 做為 Electron 應用開發的一個工具集提供了 BrowserService / ChildProcessPool / 簡易程序監控 UI / 程序間通訊 等功能,執行緒池的加入其實是當初沒有計劃的,而且執行緒池本身是獨立的,不依賴 electron-re 中其它模組功能,之後應該會被獨立出去。

程序池和執行緒池的實現方案上還需完善。

比如程序池未支援子程序空閒時自動退出以解除資源佔用,當時做了另一版監聽 ProcessHost 的任務執行情況來讓子程序空閒時休眠,想通過此方式節省資源佔用。不過由於沒有 node.js API 級別的支援以分辨子程序空閒的情況,並且子程序的休眠 / 喚醒功能比較雞肋 (有嘗試通過向子程序傳送 SIGSTOP/SIGCONT 訊號實現),最終這個特性被廢除了。

後面可以考慮支援 CPU/Memory 的負載均衡演演算法,目前已經通過專案中的 ProcessManager 模組來實現資源佔用情況採集了。

執行緒池方面相對的可用度還是較高,提供了 pool/excutor 兩個層級的呼叫管理,支援鏈式呼叫,在一些需要提升資料傳輸效能的場景支援 transferList 方式避免資料克隆。相對於其它開源 Node 執行緒池方案,著重對任務佇列功能進行了加強,支援任務重試、任務超時等功能。

更多node相關知識,請存取:!

以上就是聊聊使用Node如何實現輕量化程序池和執行緒池的詳細內容,更多請關注TW511.COM其它相關文章!