node.js「多執行緒」如何處理高並行任務?

2020-12-29 21:00:25
下面本篇文章給大家介紹一下使用 「多執行緒」處理高並行任務的方法。有一定的參考價值,有需要的朋友可以參考一下,希望對大家有所幫助。

相關推薦:《》

摩爾定律

摩爾定律是由英特爾聯合創始人戈登·摩爾(Gordon Moore)在 1965 年提出的,即積體電路上可容納的元器件的數量每隔 18 至 24 個月就會增加一倍,效能也將提升一倍。也就是說,處理器(CPU)的效能每隔大約兩年就會翻一倍。

距離摩爾定律被提出到現在,已經過去了 50 多年。如今,隨著晶片元件的規模越來越接近單個原子的規模,要跟上摩爾定律的步伐變得越來越困難。

在 2019 年,英偉達 CEO 黃仁勳在 ECS 展會上說:「摩爾定律過去是每 5 年增長 10 倍,每 10 年增長 100 倍。而如今,摩爾定律每年只能增長几個百分點,每 10 年可能只有 2 倍。因此,摩爾定律結束了。」

單個處理器(CPU)的效能越來越接近瓶頸,想要突破這個瓶頸,則需要充分利用 多執行緒技術,讓單個或多個 CPU 可以同時執行多個執行緒,更快的完成計算機任務。

Node 的多執行緒

我們都知道,Javascript 是單執行緒語言,Nodejs 利用 Javascript 的特性,使用事件驅動模型,實現了非同步 I/O,而非同步 I/O 的背後就是多執行緒排程。

Node 非同步 I/O 的實現可以參考樸靈的 《深入淺出 Node.js》

Go 語言中,可以通過建立 Goroutine 來顯式呼叫一條新執行緒,並且通過環境變數 GOMAXPROCS 來控制最大並行數。

Node 中,沒有 API 可以顯式建立新執行緒的 ,Node 實現了一些非同步 I/O 的 API,例如 fs.readFilehttp.request。這些非同步 I/O 底層是呼叫了新執行緒執行非同步任務,再利用事件驅動的模式來獲取執行結果。

伺服器端開發、工具開發可能都會需要使用到多執行緒開發。比如使用多執行緒處理複雜的爬蟲任務,用多執行緒來處理並行請求,使用多執行緒進行檔案處理等等...

在我們使用多執行緒時,一定要控制最大同時並行數。因為不控制最大並行數,可能會導致 檔案描述符 耗盡引發的錯誤,頻寬不足引發的網路錯誤、埠限制引發的錯誤等等。

Node 中並沒有用於控制最大並行數的 API 或者環境變數,所以接下來,我們就用幾行簡單的程式碼來實現。

程式碼實現

我們先假設下面的一個需求場景,我有一個爬蟲,需要每天爬取 100 篇掘金的文章,如果一篇一篇爬取的話太慢,一次爬取 100 篇會因為網路連線數太多,導致很多請求直接失敗。

那我們可以來實現一下,每次請求 10 篇,分 10 次完成。這樣不僅可以把效率提升 10 倍,並且可以穩定執行。

下面來看看單個請求任務,程式碼實現如下:

const axios = require("axios");

async function singleRequest(article_id) {
  // 這裡我們直接使用 axios 庫進行請求
  const reply = await axios.post(
    "https://api.juejin.cn/content_api/v1/article/detail",
    {
      article_id,
    }
  );

  return reply.data;
}

為了方便演示,這裡我們 100 次請求的都是同一個地址,我們來建立 100 個請求任務,程式碼實現如下:

// 請求任務列表
const requestFnList = new Array(100)
  .fill("6909002738705629198")
  .map((id) => () => singleRequest(id));

接下來,我們來實現並行請求的方法。這個方法支援同時執行多個非同步任務,並且可以限制最大並行數。在任務池的一個任務執行完成後,新的非同步任務會被推入繼續執行,以保證任務池的高利用率。程式碼實現如下:

const chalk = require("chalk");
const { log } = require("console");

/**
 * 執行多個非同步任務
 * @param {*} fnList 任務列表
 * @param {*} max 最大並行數限制
 * @param {*} taskName 任務名稱
 */
async function concurrentRun(fnList = [], max = 5, taskName = "未命名") {
  if (!fnList.length) return;

  log(chalk.blue(`開始執行多個非同步任務,最大並行數: ${max}`));
  const replyList = []; // 收集任務執行結果
  const count = fnList.length; // 總任務數量
  const startTime = new Date().getTime(); // 記錄任務執行開始時間

  let current = 0;
  // 任務執行程式
  const schedule = async (index) => {
    return new Promise(async (resolve) => {
      const fn = fnList[index];
      if (!fn) return resolve();

      // 執行當前非同步任務
      const reply = await fn();
      replyList[index] = reply;
      log(`${taskName} 事務進度 ${((++current / count) * 100).toFixed(2)}% `);

      // 執行完當前任務後,繼續執行任務池的剩餘任務
      await schedule(index + max);
      resolve();
    });
  };

  // 任務池執行程式
  const scheduleList = new Array(max)
    .fill(0)
    .map((_, index) => schedule(index));
  // 使用 Promise.all 批次執行
  const r = await Promise.all(scheduleList);

  const cost = (new Date().getTime() - startTime) / 1000;
  log(chalk.green(`執行完成,最大並行數: ${max},耗時:${cost}s`));
  return replyList;
}

從上面的程式碼可以看出,使用 Node 進行並行請求的關鍵就是 Promise.allPromise.all 可以同時執行多個非同步任務。

在上面的程式碼中,建立了一個長度為 max 最大並行數長度的陣列,陣列裡放了對應數量的非同步任務。然後使用 Promise.all 同時執行這些非同步任務,當單個非同步任務執行完成時,會在任務池取出一個新的非同步任務繼續執行,完成了效率最大化。

接下來,我們用下面這段程式碼進行執行測試(程式碼實現如下)

(async () => {
  const requestFnList = new Array(100)
    .fill("6909002738705629198")
    .map((id) => () => singleRequest(id));

  const reply = await concurrentRun(requestFnList, 10, "請求掘金文章");
})();

最終執行結果如下圖所示:

1.jpg

到這裡,我們的並行請求就完成啦!接下來我們分別來測試一下不同並行的速度吧~ 首先是 1 個並行,也就是沒有並行(如下圖)

2.jpg

耗時 11.462 秒!當不使用並行時,任務耗時非常長,接下來我們看看在其他並行數的情況下耗時(如下圖)

3.jpg

4.jpg

5.jpg

6.jpg

從上圖可以看出,隨著我們並行數的提高,任務執行速度越來越快!這就是高並行的優勢,可以在某些情況下提升數倍乃至數十倍的效率!

我們仔細看看上面的耗時會發現,隨著並行數的增加,耗時還是會有一個閾值,不能完全呈倍數增加。這是因為 Node 實際上並沒有為每一個任務開一個執行緒進行處理,而只是為非同步 I/O 任務開啟了新的執行緒。所以,Node 比較適合處理 I/O 密集型任務,並不適合 CPU(計算)密集型任務。

到這裡,我們的使用 Node 「多執行緒」處理高並行任務就介紹完了。如果想要程式完善一點的話,還需要考慮到任務超時時間、容錯機制,大家感興趣的可以自己實現一下。

更多程式設計相關知識,請存取:!!

以上就是node.js「多執行緒」如何處理高並行任務?的詳細內容,更多請關注TW511.COM其它相關文章!