Node.js躬行記(23)——Worker threads

2022-09-26 12:00:41

  Node.js 官方提供了 ClusterChild process 建立子程序,通過 Worker threads 模組建立子執行緒。但前者無法共用記憶體,通訊必須使用 JSON 格式,有一定的侷限性和效能問題。後者更輕量,並且可以共用記憶體,通過傳輸 ArrayBuffer 範例或共用 SharedArrayBuffer 範例來做到這一點,即資料格式沒有太多要求。但是要注意,資料中不能包含函數。

  Worker threads 從 Node V12 開始成為正式標準,其對於執行 CPU 密集型的操作很有用,而對 I/O 密集型工作沒有多大幫助。 Node.js 內建的非同步 I/O 操作要比它效率更高。注意,Worker threads 是基於 Node.js 架構的多工作執行緒,如下圖所示。在每個工作執行緒中,都會包含 V8 和 libuv,即都包含Event Loop。

  

一、執行緒池

  建立、執行、銷燬一個 Worker 的開銷是很大的,所以需要實現一個執行緒池(Worker Pool),在初始化時建立有限數量的 Worker 並載入單一的 worker.js,主執行緒和 Worker 可進行程序間通訊,當所有任務完成後,這些 Worker 將會被統一銷燬。

  在 Worker 中通過 parentPort.postMessage() 向主執行緒傳送訊息,而在主執行緒中可以通過 worker.on('message') 接收傳送過來的訊息,worker 是一個 Worker 範例,例如 new Worker(filePath)。

  下面是一個官方範例,isMainThread 可判斷當前是否是主執行緒,workerData 是傳遞給 Worker 的資料。

const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
if (isMainThread) {
  module.exports = function parseJSAsync(script) {
    return new Promise((resolve, reject) => {
      const worker = new Worker(__filename, {
        workerData: script
      });
      worker.on('message', resolve);
      worker.on('error', reject);
      worker.on('exit', (code) => {
        if (code !== 0)
          reject(new Error(`Worker stopped with exit code ${code}`));
      });
    });
  };
} else {
  const script = workerData;
  parentPort.postMessage(script);
}

  下面是一個執行緒池範例,參考自《worker_threads 初體驗》一文,做了微調,具體在此不在贅述,可閱讀原文或註釋。

// 獲取當前裝置的 CPU 執行緒數目,作為 numberOfThreads 的預設值。
const { length: cpusLength } = require('os').cpus();
const { Worker } = require('worker_threads');

class WorkerPool {
  constructor(workerPath, options = {}, numberOfThreads = cpusLength) {
    if (numberOfThreads < 1) {
      throw new Error('Number of threads should be greater or equal than 1!');
    }
    this.workerPath = workerPath;
    this.numberOfThreads = numberOfThreads;
    // 任務佇列
    this._queue = [];
    // Worker 索引
    this._workersById = {};
    // Worker 啟用狀態索引
    this._activeWorkersById = {};
    // 建立 Workers
    for (let i = 0; i < this.numberOfThreads; i++) {
      const worker = new Worker(workerPath, options);
      this._workersById[i] = worker;
      // 將這些 Worker 設定為未啟用狀態
      this._activeWorkersById[i] = false;
    }
  }
  /**
   * 檢查空閒的 Worker
   */
  getInactiveWorkerId() {
    for (let i = 0; i < this.numberOfThreads; i++) {
      if (!this._activeWorkersById[i]) return i;
    }
    return -1;
  }
  /**
   * 呼叫 Worker 執行,目的是在指定的 Worker 裡執行指定的任務
   */
  runWorker(workerId, taskObj) {
    const worker = this._workersById[workerId];
    // 當任務執行完畢後執行
    const doAfterTaskIsFinished = () => {
      // 去除所有的 Listener,不然一次次新增不同的 Listener 會記憶體溢位(OOM)
      worker.removeAllListeners('message');
      worker.removeAllListeners('error');
      // 將這個 Worker 設為未啟用狀態
      this._activeWorkersById[workerId] = false;
  
      if (this._queue.length) {
        // 任務佇列非空,使用該 Worker 執行任務佇列中第一個任務
        this.runWorker(workerId, this._queue.shift());
      }
    };
    // 將這個 Worker 設定為啟用狀態
    this._activeWorkersById[workerId] = true;
    // 設定兩個回撥,用於 Worker 的監聽器
    const messageCallback = result => {
      taskObj.cb(null, result);
      doAfterTaskIsFinished();
    };
    const errorCallback = error => {
      taskObj.cb(error);
      doAfterTaskIsFinished();
    };
    // 為 Worker 新增 'message' 和 'error' 兩個 Listener
    worker.once('message', messageCallback);
    worker.once('error', errorCallback);
    // 將資料傳給 Worker 供其獲取和執行
    worker.postMessage(taskObj.data);
  }
  /**
   * 執行執行緒
   */
  run(data) {
    // Promise 是個好東西
    return new Promise((resolve, reject) => {
      // 呼叫 getInactiveWorkerId() 獲取一個空閒的 Worker
      const availableWorkerId = this.getInactiveWorkerId();
      const taskObj = {
        data,
        cb: (error, result) => {
          // 雖然 Workers 需要使用 Listener 和 Callback,但這不能阻止我們使用 Promise,對吧?
          // 不,你不能 util.promisify(taskObj) 。人不能,至少不應該。
          if (error) reject(error);
          return resolve(result);
        }
      };
      if (availableWorkerId === -1) {
        // 當前沒有空閒的 Workers 了,把任務丟進佇列裡,這樣一旦有 Workers 空閒時就會開始執行。
        this._queue.push(taskObj);
        return null;
      }
      // 有一個空閒的 Worker,用它執行任務
      this.runWorker(availableWorkerId, taskObj);
    })
  }
  /**
   * 銷燬
   */
   destroy(force = false) {
    for (let i = 0; i < this.numberOfThreads; i++) {
      if (this._activeWorkersById[i] && !force) {
        // 通常情況下,不應該在還有 Worker 在執行的時候就銷燬它,這一定是什麼地方出了問題,所以還是拋個 Error 比較好
        // 不過保留一個 force 引數,總有人用得到的
        throw new Error(`The worker ${i} is still runing!`);
      }
      // 銷燬這個 Worker
      this._workersById[i].terminate();
    }
  }
}
module.exports = WorkerPool;

二、實踐

  之所以需要多執行緒,是為了解決一個優化需求。就是有一個介面,裡面有很多查詢資料庫(MySQL和MongoDB)的操作,單條語句並不會慢,但累加後整體的響應速度就會變慢,那麼就想通過多執行緒,同時處理一些查詢語句,然後整合結果。

  先對執行緒池做最簡單的處理,建立 worker.js,接收 userId。

const { isMainThread, parentPort } = require('worker_threads');
// 不是主執行緒時執行
if (!isMainThread) {
  parentPort.on('message', async ({userId }) => {
    console.log('postMessage', userId);
    parentPort.postMessage(userId);
  });
}

  然後初始化執行緒池,將陣列中的 userId 傳遞給 Worker,pool.run({ userId: item })。

const WorkerPool = require('./workerPool');
const { join } = require('path');
async function workerMain(services) {
  const workerPath = join(__dirname + '/worker.js');
  // 初始化一個 Worker Pool
  const pool = new WorkerPool(workerPath);
  Promise.all([4,12,13,15].map(async item => {
    await pool.run({ userId: item });
  })).then(json => {
    // 銷燬執行緒池
    pool.destroy();
  });
}

  輸出順序沒有按照陣列的順序,並且每次的輸出順序還都是不同的,由此可知,程式碼是並行執行的。

postMessage 12
postMessage 4
postMessage 15
postMessage 13

  那麼接下來就引入資料庫查詢的程式碼,公司專案基於 sequelize.js 封裝了增刪改查的邏輯,通過 services 變數可以呼叫相關的操作。在主執行緒中,計劃將 services 傳遞到 Worker 中。

async function workerMain(services) {
  // Worker Threads 不能共用範例以及帶函數的物件
  const workerPath = join(__dirname + '/worker.js', { workerData: services });
  // 初始化一個 Worker Pool
  const pool = new WorkerPool(workerPath);
  // 省略程式碼......
}

  然而報錯了,大致是下面這個意思,無法克隆,因為物件中包含函數,就會引發錯誤。

node:internal/worker:349     
ReflectApply(this[kPublicPort].postMessage, this[kPublicPort], args);
could not be cloned. 

  想以通訊的方式實現資料庫的並行查詢,目前看來不能完成。

  其實可以在 worker.js 中單獨引入 services, 不過由於我們在指令碼檔案中採用了 import 語法,因此在執行時會報錯,SyntaxError: Cannot use import statement outside a module。

const { isMainThread, parentPort, workerData } = require('worker_threads');
const services = require('../services');
// 不是主執行緒時執行
if (!isMainThread) {
  // 省略程式碼......
}

  還有一種解決方案,其成本就比較高,就是單獨再實現一套服務層,也就是說再封裝一層符合Node.js 模組化語法的資料庫操作集合。