Node.js 官方提供了 Cluster 和 Child process 建立子程序,通過 Worker threads 模組建立子執行緒。但前者無法共用記憶體,通訊必須使用 JSON 格式,有一定的侷限性和效能問題。後者更輕量,並且可以共用記憶體,通過傳輸 ArrayBuffer 範例或共用 SharedArrayBuffer 範例來做到這一點,即資料格式沒有太多要求。但是要注意,資料中不能包含函數。
Worker threads 從 Node V12 開始成為正式標準,其對於執行 CPU 密集型的操作很有用,而對 I/O 密集型工作沒有多大幫助。 Node.js 內建的非同步 I/O 操作要比它效率更高。注意,Worker threads 是基於 Node.js 架構的多工作執行緒,如下圖所示。在每個工作執行緒中,都會包含 V8 和 libuv,即都包含Event Loop。
建立、執行、銷燬一個 Worker 的開銷是很大的,所以需要實現一個執行緒池(Worker Pool),在初始化時建立有限數量的 Worker 並載入單一的 worker.js,主執行緒和 Worker 可進行程序間通訊,當所有任務完成後,這些 Worker 將會被統一銷燬。
在 Worker 中通過 parentPort.postMessage() 向主執行緒傳送訊息,而在主執行緒中可以通過 worker.on('message') 接收傳送過來的訊息,worker 是一個 Worker 範例,例如 new Worker(filePath)。
下面是一個官方範例,isMainThread 可判斷當前是否是主執行緒,workerData 是傳遞給 Worker 的資料。
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads'); if (isMainThread) { module.exports = function parseJSAsync(script) { return new Promise((resolve, reject) => { const worker = new Worker(__filename, { workerData: script }); worker.on('message', resolve); worker.on('error', reject); worker.on('exit', (code) => { if (code !== 0) reject(new Error(`Worker stopped with exit code ${code}`)); }); }); }; } else { const script = workerData; parentPort.postMessage(script); }
下面是一個執行緒池範例,參考自《worker_threads 初體驗》一文,做了微調,具體在此不在贅述,可閱讀原文或註釋。
// 獲取當前裝置的 CPU 執行緒數目,作為 numberOfThreads 的預設值。 const { length: cpusLength } = require('os').cpus(); const { Worker } = require('worker_threads'); class WorkerPool { constructor(workerPath, options = {}, numberOfThreads = cpusLength) { if (numberOfThreads < 1) { throw new Error('Number of threads should be greater or equal than 1!'); } this.workerPath = workerPath; this.numberOfThreads = numberOfThreads; // 任務佇列 this._queue = []; // Worker 索引 this._workersById = {}; // Worker 啟用狀態索引 this._activeWorkersById = {}; // 建立 Workers for (let i = 0; i < this.numberOfThreads; i++) { const worker = new Worker(workerPath, options); this._workersById[i] = worker; // 將這些 Worker 設定為未啟用狀態 this._activeWorkersById[i] = false; } } /** * 檢查空閒的 Worker */ getInactiveWorkerId() { for (let i = 0; i < this.numberOfThreads; i++) { if (!this._activeWorkersById[i]) return i; } return -1; } /** * 呼叫 Worker 執行,目的是在指定的 Worker 裡執行指定的任務 */ runWorker(workerId, taskObj) { const worker = this._workersById[workerId]; // 當任務執行完畢後執行 const doAfterTaskIsFinished = () => { // 去除所有的 Listener,不然一次次新增不同的 Listener 會記憶體溢位(OOM) worker.removeAllListeners('message'); worker.removeAllListeners('error'); // 將這個 Worker 設為未啟用狀態 this._activeWorkersById[workerId] = false; if (this._queue.length) { // 任務佇列非空,使用該 Worker 執行任務佇列中第一個任務 this.runWorker(workerId, this._queue.shift()); } }; // 將這個 Worker 設定為啟用狀態 this._activeWorkersById[workerId] = true; // 設定兩個回撥,用於 Worker 的監聽器 const messageCallback = result => { taskObj.cb(null, result); doAfterTaskIsFinished(); }; const errorCallback = error => { taskObj.cb(error); doAfterTaskIsFinished(); }; // 為 Worker 新增 'message' 和 'error' 兩個 Listener worker.once('message', messageCallback); worker.once('error', errorCallback); // 將資料傳給 Worker 供其獲取和執行 worker.postMessage(taskObj.data); } /** * 執行執行緒 */ run(data) { // Promise 是個好東西 return new Promise((resolve, reject) => { // 呼叫 getInactiveWorkerId() 獲取一個空閒的 Worker const availableWorkerId = this.getInactiveWorkerId(); const taskObj = { data, cb: (error, result) => { // 雖然 Workers 需要使用 Listener 和 Callback,但這不能阻止我們使用 Promise,對吧? // 不,你不能 util.promisify(taskObj) 。人不能,至少不應該。 if (error) reject(error); return resolve(result); } }; if (availableWorkerId === -1) { // 當前沒有空閒的 Workers 了,把任務丟進佇列裡,這樣一旦有 Workers 空閒時就會開始執行。 this._queue.push(taskObj); return null; } // 有一個空閒的 Worker,用它執行任務 this.runWorker(availableWorkerId, taskObj); }) } /** * 銷燬 */ destroy(force = false) { for (let i = 0; i < this.numberOfThreads; i++) { if (this._activeWorkersById[i] && !force) { // 通常情況下,不應該在還有 Worker 在執行的時候就銷燬它,這一定是什麼地方出了問題,所以還是拋個 Error 比較好 // 不過保留一個 force 引數,總有人用得到的 throw new Error(`The worker ${i} is still runing!`); } // 銷燬這個 Worker this._workersById[i].terminate(); } } } module.exports = WorkerPool;
之所以需要多執行緒,是為了解決一個優化需求。就是有一個介面,裡面有很多查詢資料庫(MySQL和MongoDB)的操作,單條語句並不會慢,但累加後整體的響應速度就會變慢,那麼就想通過多執行緒,同時處理一些查詢語句,然後整合結果。
先對執行緒池做最簡單的處理,建立 worker.js,接收 userId。
const { isMainThread, parentPort } = require('worker_threads'); // 不是主執行緒時執行 if (!isMainThread) { parentPort.on('message', async ({userId }) => { console.log('postMessage', userId); parentPort.postMessage(userId); }); }
然後初始化執行緒池,將陣列中的 userId 傳遞給 Worker,pool.run({ userId: item })。
const WorkerPool = require('./workerPool'); const { join } = require('path'); async function workerMain(services) { const workerPath = join(__dirname + '/worker.js'); // 初始化一個 Worker Pool const pool = new WorkerPool(workerPath); Promise.all([4,12,13,15].map(async item => { await pool.run({ userId: item }); })).then(json => { // 銷燬執行緒池 pool.destroy(); }); }
輸出順序沒有按照陣列的順序,並且每次的輸出順序還都是不同的,由此可知,程式碼是並行執行的。
postMessage 12 postMessage 4 postMessage 15 postMessage 13
那麼接下來就引入資料庫查詢的程式碼,公司專案基於 sequelize.js 封裝了增刪改查的邏輯,通過 services 變數可以呼叫相關的操作。在主執行緒中,計劃將 services 傳遞到 Worker 中。
async function workerMain(services) { // Worker Threads 不能共用範例以及帶函數的物件 const workerPath = join(__dirname + '/worker.js', { workerData: services }); // 初始化一個 Worker Pool const pool = new WorkerPool(workerPath); // 省略程式碼...... }
然而報錯了,大致是下面這個意思,無法克隆,因為物件中包含函數,就會引發錯誤。
node:internal/worker:349 ReflectApply(this[kPublicPort].postMessage, this[kPublicPort], args); could not be cloned.
想以通訊的方式實現資料庫的並行查詢,目前看來不能完成。
其實可以在 worker.js 中單獨引入 services, 不過由於我們在指令碼檔案中採用了 import 語法,因此在執行時會報錯,SyntaxError: Cannot use import statement outside a module。
const { isMainThread, parentPort, workerData } = require('worker_threads'); const services = require('../services'); // 不是主執行緒時執行 if (!isMainThread) { // 省略程式碼...... }
還有一種解決方案,其成本就比較高,就是單獨再實現一套服務層,也就是說再封裝一層符合Node.js 模組化語法的資料庫操作集合。