很多人都想知道單執行緒的 Node.js 怎麼能與多執行緒後端競爭。考慮到其所謂的單執行緒特性,許多大公司選擇 Node 作爲其後端似乎違反直覺。要想知道原因,必須理解其單執行緒的真正含義。【視訊教學推薦: 】
JavaScript 的設計非常適合在網上做比較簡單的事情,比如驗證表單,或者說建立彩虹色的滑鼠軌跡。 在2009年,Node.js的創始人 Ryan Dahl使開發人員可以用該語言編寫後端程式碼。
通常支援多執行緒的後端語言具有各種機制 機製,用於線上程和其他面向執行緒的功能之間同步數據。要向 JavaScript 新增對此類功能的支援,需要修改整個語言,這不是 Dahl 的目標。爲了讓純 JavaScript 支援多執行緒,他必須想一個變通方法。接下來讓我們探索一下其中的奧祕……
Node.js 使用兩種執行緒:event loop 處理的主執行緒和 worker pool 中的幾個輔助執行緒。
事件回圈是一種機制 機製,它採用回撥(函數)並註冊它們,準備在將來的某個時刻執行。它與相關的 JavaScript 程式碼在同一個執行緒中執行。當 JavaScript 操作阻塞執行緒時,事件回圈也會被阻止。
工作池是一種執行模型,它產生並處理單獨的執行緒,然後同步執行任務,並將結果返回到事件回圈。事件回圈使用返回的結果執行提供的回撥。
簡而言之,它負責非同步 I/O操作 —— 主要是與系統磁碟和網路的互動。它主要由諸如 fs
(I/O 密集)或 crypto
(CPU 密集)等模組使用。工作池用 libuv 實現,當 Node 需要在 JavaScript 和 C++ 之間進行內部通訊時,會導致輕微的延遲,但這幾乎不可察覺。
基於這兩種機制 機製,我們可以編寫如下程式碼:
fs.readFile(path.join(__dirname, './package.json'), (err, content) => { if (err) { return null; } console.log(content.toString()); });
前面提到的 fs
模組告訴工作池使用其中一個執行緒來讀取檔案的內容,並在完成後通知事件回圈。然後事件回圈獲取提供的回撥函數,並用檔案的內容執行它。
以上是非阻塞程式碼的範例,我們不必同步等待某事的發生。只需告訴工作池去讀取檔案,並用結果去呼叫提供的函數即可。由於工作池有自己的執行緒,因此事件回圈可以在讀取檔案時繼續正常執行。
在不需要同步執行某些複雜操作時,這一切都相安無事:任何執行時間太長的函數都會阻塞執行緒。如果應用程式中有大量這類功能,就可能會明顯降低伺服器的吞吐量,甚至完全凍結它。在這種情況下,無法繼續將工作委派給工作池。
在需要對數據進行復雜的計算時(如AI、機器學習或大數據)無法真正有效地使用 Node.js,因爲操作阻塞了主(且唯一)執行緒,使伺服器無響應。在 Node.js v10.5.0 發佈之前就是這種情況,在這一版本增加了對多執行緒的支援。
worker_threads
模組允許我們建立功能齊全的多執行緒 Node.js 程式。
thread worker 是在單獨的執行緒中生成的一段程式碼(通常從檔案中取出)。
注意,術語 thread worker,worker 和 thread 經常互換使用,他們都指的是同一件事。
要想使用 thread worker,必須匯入 worker_threads
模組。讓我們先寫一個函數來幫助我們生成這些thread worker,然後再討論它們的屬性。
type WorkerCallback = (err: any, result?: any) => any; export function runWorker(path: string, cb: WorkerCallback, workerData: object | null = null) { const worker = new Worker(path, { workerData }); worker.on('message', cb.bind(null, null)); worker.on('error', cb); worker.on('exit', (exitCode) => { if (exitCode === 0) { return null; } return cb(new Error(`Worker has stopped with code ${exitCode}`)); }); return worker; }
要建立一個 worker,首先必須建立一個 Worker
類的範例。它的第一個參數提供了包含 worker 的程式碼的檔案的路徑;第二個參數提供了一個名爲 workerData
的包含一個屬性的物件。這是我們希望執行緒在開始執行時可以存取的數據。
請注意:不管你是用的是 JavaScript, 還是最終要轉換爲 JavaScript 的語言(例如,TypeScript),路徑應該始終參照帶有 .js
或 .mjs
擴充套件名的檔案。
我還想指出爲什麼使用回撥方法,而不是返回在觸發 message
事件時將解決的 promise。這是因爲 worker 可以發送許多 message
事件,而不是一個。
正如你在上面的例子中所看到的,執行緒間的通訊是基於事件的,這意味着我們設定了 worker 在發送給定事件後呼叫的偵聽器。
以下是最常見的事件:
worker.on('error', (error) => {});
只要 worker 中有未捕獲的異常,就會發出 error
事件。然後終止 worker,錯誤可以作爲提供的回撥中的第一個參數。
worker.on('exit', (exitCode) => {});
在 worker 退出時會發出 exit
事件。如果在worker中呼叫了 process.exit()
,那麼 exitCode
將被提供給回撥。如果 worker 以 worker.terminate()
終止,則程式碼爲1。
worker.on('online', () => {});
只要 worker 停止解析 JavaScript 程式碼並開始執行,就會發出 online
事件。它不常用,但在特定情況下可以提供資訊。
worker.on('message', (data) => {});
只要 worker 將數據發送到父執行緒,就會發出 message
事件。
現在讓我們來看看如何線上程之間共用數據。
要將數據發送到另一個執行緒,可以用 port.postMessage()
方法。它的原型如下:
port.postMessage(data[, transferList])
port 物件可以是 parentPort
,也可以是 MessagePort
的範例 —— 稍後會詳細講解。
第一個參數 —— 這裏被稱爲 data
—— 是一個被複制到另一個執行緒的物件。它可以是複製演算法所支援的任何內容。
數據由結構化克隆演算法進行復制。參照自 Mozilla:
它通過遞回輸入物件來進行克隆,同時保持之前存取過的參照的對映,以避免無限遍歷回圈。
該演算法不復制函數、錯誤、屬性描述符或原型鏈。還需要注意的是,以這種方式複製物件與使用 JSON 不同,因爲它可以包含回圈參照和型別化陣列,而 JSON 不能。
由於能夠複製型別化陣列,該演算法可以線上程之間共用記憶體。
人們可能會說像 cluster
或 child_process
這樣的模組在很久以前就開始使用執行緒了。這話對,也不對。
cluster
模組可以建立多個節點範例,其中一個主進程在它們之間對請求進行路由。叢集能夠有效地增加伺服器的吞吐量;但是我們不能用 cluster
模組生成一個單獨的執行緒。
人們傾向於用 PM2 這樣的工具來集中管理他們的程式,而不是在自己的程式碼中手動執行,如果你有興趣,可以研究一下如何使用 cluster
模組。
child_process
模組可以生成任何可執行檔案,無論它是否是用 JavaScript 寫的。它和 worker_threads
非常相似,但缺少後者的幾個重要功能。
具體來說 thread workers 更輕量,並且與其父執行緒共用相同的進程 ID。它們還可以與父執行緒共用記憶體,這樣可以避免對大的數據負載進行序列化,從而更有效地來回傳遞數據。
現在讓我們看一下如何線上程之間共用記憶體。爲了共用記憶體,必須將 ArrayBuffer
或 SharedArrayBuffer
的範例作爲數據參數發送到另一個執行緒。
這是一個與其父執行緒共用記憶體的 worker:
import { parentPort } from 'worker_threads'; parentPort.on('message', () => { const numberOfElements = 100; const sharedBuffer = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT * numberOfElements); const arr = new Int32Array(sharedBuffer); for (let i = 0; i < numberOfElements; i += 1) { arr[i] = Math.round(Math.random() * 30); } parentPort.postMessage({ arr }); });
首先,我們建立一個 SharedArrayBuffer
,其記憶體需要包含100個32位元整數。接下來建立一個 Int32Array
範例,它將用緩衝區來儲存其結構,然後用一些亂數填充陣列並將其發送到父執行緒。
在父執行緒中:
import path from 'path'; import { runWorker } from '../run-worker'; const worker = runWorker(path.join(__dirname, 'worker.js'), (err, { arr }) => { if (err) { return null; } arr[0] = 5; }); worker.postMessage({});
把 arr [0]
的值改爲5
,實際上會在兩個執行緒中修改它。
當然,通過共用記憶體,我們冒險在一個執行緒中修改一個值,同時也在另一個執行緒中進行了修改。但是我們在這個過程中也得到了一個好處:該值不需要進行序列化就可以另一個執行緒中使用,這極大地提高了效率。只需記住管理數據正確的參照,以便在完成數據處理後對其進行垃圾回收。
共用一個整數陣列固然很好,但我們真正感興趣的是共用物件 —— 這是儲存資訊的預設方式。不幸的是,沒有 SharedObjectBuffer
或類似的東西,但我們可以自己建立一個類似的結構。
transferList
中只能包含 ArrayBuffer
和 MessagePort
。一旦它們被傳送到另一個執行緒,就不能再次被傳送了;因爲記憶體裡的內容已經被移動到了另一個執行緒。
目前,還不能通過 transferList
(可以使用 child_process
模組)來傳輸網路通訊端。
執行緒之間的通訊是通過 port 進行的,port 是 MessagePort
類的範例,並啓用基於事件的通訊。
使用 port 線上程之間進行通訊的方法有兩種。第一個是預設值,這個方法比較容易。在 worker 的程式碼中,我們從worker_threads
模組匯入一個名爲 parentPort
的物件,並使用物件的 .postMessage()
方法將訊息發送到父執行緒。
這是一個例子:
import { parentPort } from 'worker_threads'; const data = { // ... }; parentPort.postMessage(data);
parentPort
是 Node.js 在幕後建立的 MessagePort
範例,用於與父執行緒進行通訊。這樣就可以用 parentPort
和 worker
物件線上程之間進行通訊。
執行緒間的第二種通訊方式是建立一個 MessageChannel
並將其發送給 worker。以下程式碼是如何建立一個新的 MessagePort
並與我們的 worker 共用它:
import path from 'path'; import { Worker, MessageChannel } from 'worker_threads'; const worker = new Worker(path.join(__dirname, 'worker.js')); const { port1, port2 } = new MessageChannel(); port1.on('message', (message) => { console.log('message from worker:', message); }); worker.postMessage({ port: port2 }, [port2]);
在建立 port1
和 port2
之後,我們在 port1
上設定事件監聽器並將 port2
發送給 worker。我們必須將它包含在 transferList
中,以便將其傳輸給 worker 。
在 worker 內部:
import { parentPort, MessagePort } from 'worker_threads'; parentPort.on('message', (data) => { const { port }: { port: MessagePort } = data; port.postMessage('heres your message!'); });
這樣,我們就能使用父執行緒發送的 port 了。
使用 parentPort
不一定是錯誤的方法,但最好用 MessageChannel
的範例建立一個新的 MessagePort
,然後與生成的 worker 共用它。
請注意,在後面的例子中,爲了簡便起見,我用了 parentPort
。
可以通過兩種方式使用 worker。第一種是生成一個 worker,然後執行它的程式碼,並將結果發送到父執行緒。通過這種方法,每當出現新任務時,都必須重新建立一個工作者。
第二種方法是生成一個 worker 併爲 message
事件設定監聽器。每次觸發 message
時,它都會完成工作並將結果發送回父執行緒,這會使 worker 保持活動狀態以供以後使用。
Node.js 文件推薦第二種方法,因爲在建立 thread worker 時需要建立虛擬機器並解析和執行程式碼,這會產生比較大的開銷。所以這種方法比不斷產生新 worker 的效率更高。
這種方法被稱爲工作池,因爲我們建立了一個工作池並讓它們等待,在需要時排程 message
事件來完成工作。
以下是一個產生、執行然後關閉 worker 例子:
import { parentPort } from 'worker_threads'; const collection = []; for (let i = 0; i < 10; i += 1) { collection[i] = i; } parentPort.postMessage(collection);
將 collection
發送到父執行緒後,它就會退出。
下面 下麪是一個 worker 的例子,它可以在給定任務之前等待很長一段時間:
import { parentPort } from 'worker_threads'; parentPort.on('message', (data: any) => { const result = doSomething(data); parentPort.postMessage(result); });
worker_threads
模組中有一些可用的屬性:
當不在工作執行緒內操作時,該屬性爲 true
。如果你覺得有必要,可以在 worker 檔案的開頭包含一個簡單的 if
語句,以確保它只作爲 worker 執行。
import { isMainThread } from 'worker_threads'; if (isMainThread) { throw new Error('Its not a worker'); }
產生執行緒時包含在 worker 的建構函式中的數據。
const worker = new Worker(path, { workerData });
在工作執行緒中:
import { workerData } from 'worker_threads'; console.log(workerData.property);
前面提到的 MessagePort
範例,用於與父執行緒通訊。
分配給 worker 的唯一識別符號。
現在我們知道了技術細節,接下來實現一些東西並在實踐中檢驗學到的知識。
setTimeout
setTimeout
是一個無限回圈,顧名思義,用來檢測程式執行時間是否超時。它在回圈中檢查起始時間與給定毫秒數之和是否小於實際日期。
import { parentPort, workerData } from 'worker_threads'; const time = Date.now(); while (true) { if (time + workerData.time <= Date.now()) { parentPort.postMessage({}); break; } }
這個特定的實現產生一個執行緒,然後執行它的程式碼,最後在完成後退出。
接下來實現使用這個 worker 的程式碼。首先建立一個狀態,用它來跟蹤生成的 worker:
const timeoutState: { [key: string]: Worker } = {};
然後時負責建立 worker 並將其儲存到狀態的函數:
export function setTimeout(callback: (err: any) => any, time: number) { const id = uuidv4(); const worker = runWorker( path.join(__dirname, './timeout-worker.js'), (err) => { if (!timeoutState[id]) { return null; } timeoutState[id] = null; if (err) { return callback(err); } callback(null); }, { time, }, ); timeoutState[id] = worker; return id; }
首先,我們使用 UUID 包爲 worker 建立一個唯一的識別符號,然後用先前定義的函數 runWorker
來獲取 worker。我們還向 worker 傳入一個回撥函數,一旦 worker 發送了數據就會被觸發。最後,把 worker 儲存在狀態中並返回 id
。
在回撥函數中,我們必須檢查該 worker 是否仍然存在於該狀態中,因爲有可能會 cancelTimeout()
,這將會把它刪除。如果確實存在,就把它從狀態中刪除,並呼叫傳給 setTimeout
函數的 callback
。
cancelTimeout
函數使用 .terminate()
方法強制 worker 退出,並從該狀態中刪除該這個worker:
export function cancelTimeout(id: string) { if (timeoutState[id]) { timeoutState[id].terminate(); timeoutState[id] = undefined; return true; } return false; }
如果你有興趣,我也實現了 setInterval
,程式碼在這裏,但因爲它對執行緒什麼都沒做(我們重用setTimeout
的程式碼),所以我決定不在這裏進行解釋。
我已經建立了一個短小的測試程式碼,目的是檢查這種方法與原生方法的不同之處。你可以在這裏找到程式碼。這些是結果:
native setTimeout { ms: 7004, averageCPUCost: 0.1416 } worker setTimeout { ms: 7046, averageCPUCost: 0.308 }
我們可以看到 setTimeout
有一點延遲 - 大約40ms - 這時 worker 被建立時的消耗。平均 CPU 成本也略高,但沒什麼難以忍受的(CPU 成本是整個過程持續時間內 CPU 使用率的平均值)。
如果我們可以重用 worker,就能夠降低延遲和 CPU 使用率,這就是要實現工作池的原因。
如上所述,工作池是給定數量的被事先建立的 worker,他們保持空閒並監聽 message
事件。一旦 message
事件被觸發,他們就會開始工作併發回結果。
爲了更好地描述我們將要做的事情,下面 下麪我們來建立一個由八個 thread worker 組成的工作池:
const pool = new WorkerPool(path.join(__dirname, './test-worker.js'), 8);
如果你熟悉限制併發操作,那麼你在這裏看到的邏輯幾乎相同,只是一個不同的用例。
如上面的程式碼片段所示,我們把指向 worker 的路徑和要生成的 worker 數量傳給了 WorkerPool
的建構函式。
export class WorkerPool<T, N> { private queue: QueueItem<T, N>[] = []; private workersById: { [key: number]: Worker } = {}; private activeWorkersById: { [key: number]: boolean } = {}; public constructor(public workerPath: string, public numberOfThreads: number) { this.init(); } }
這裏還有其他一些屬性,如 workersById
和 activeWorkersById
,我們可以分別儲存現有的 worker 和當前正在執行的 worker 的 ID。還有 queue
,我們可以使用以下結構來儲存物件:
type QueueCallback<N> = (err: any, result?: N) => void; interface QueueItem<T, N> { callback: QueueCallback<N>; getData: () => T; }
callback
只是預設的節點回撥,第一個參數是錯誤,第二個參數是可能的結果。 getData
是傳遞給工作池 .run()
方法的函數(如下所述),一旦專案開始處理就會被呼叫。 getData
函數返回的數據將傳給工作執行緒。
在 .init()
方法中,我們建立了 worker 並將它們儲存在以下狀態中:
private init() { if (this.numberOfThreads < 1) { return null; } for (let i = 0; i < this.numberOfThreads; i += 1) { const worker = new Worker(this.workerPath); this.workersById[i] = worker; this.activeWorkersById[i] = false; } }
爲避免無限回圈,我們首先要確保執行緒數 > 1。然後建立有效的 worker 數,並將它們的索引儲存在 workersById
狀態。我們在 activeWorkersById
狀態中儲存了它們當前是否正在執行的資訊,預設情況下該狀態始終爲false。
現在我們必須實現前面提到的 .run()
方法來設定一個 worker 可用的任務。
public run(getData: () => T) { return new Promise<N>((resolve, reject) => { const availableWorkerId = this.getInactiveWorkerId(); const queueItem: QueueItem<T, N> = { getData, callback: (error, result) => { if (error) { return reject(error); } return resolve(result); }, }; if (availableWorkerId === -1) { this.queue.push(queueItem); return null; } this.runWorker(availableWorkerId, queueItem); }); }
在 promise 函數裡,我們首先通過呼叫 .getInactiveWorkerId()
來檢查是否存在空閒的 worker 可以來處理數據:
private getInactiveWorkerId(): number { for (let i = 0; i < this.numberOfThreads; i += 1) { if (!this.activeWorkersById[i]) { return i; } } return -1; }
接下來,我們建立一個 queueItem
,在其中儲存傳遞給 .run()
方法的 getData
函數以及回撥。在回撥中,我們要麼 resolve
或者 reject
promise,這取決於 worker 是否將錯誤傳遞給回撥。
如果 availableWorkerId
的值是 -1,意味着當前沒有可用的 worker,我們將 queueItem
新增到 queue
。如果有可用的 worker,則呼叫 .runWorker()
方法來執行 worker。
在 .runWorker()
方法中,我們必須把當前 worker 的 activeWorkersById
設定爲使用狀態;爲 message
和 error
事件設定事件監聽器(並在之後清理它們);最後將數據發送給 worker。
private async runWorker(workerId: number, queueItem: QueueItem<T, N>) { const worker = this.workersById[workerId]; this.activeWorkersById[workerId] = true; const messageCallback = (result: N) => { queueItem.callback(null, result); cleanUp(); }; const errorCallback = (error: any) => { queueItem.callback(error); cleanUp(); }; const cleanUp = () => { worker.removeAllListeners('message'); worker.removeAllListeners('error'); this.activeWorkersById[workerId] = false; if (!this.queue.length) { return null; } this.runWorker(workerId, this.queue.shift()); }; worker.once('message', messageCallback); worker.once('error', errorCallback); worker.postMessage(await queueItem.getData()); }
首先,通過使用傳遞的 workerId
,我們從 workersById
中獲得 worker 參照。然後,在 activeWorkersById
中,將 [workerId]
屬性設定爲true,這樣我們就能知道在 worker 在忙,不要執行其他任務。
接下來,分別建立 messageCallback
和 errorCallback
用來在訊息和錯誤事件上呼叫,然後註冊所述函數來監聽事件並將數據發送給 worker。
在回撥中,我們呼叫 queueItem
的回撥,然後呼叫 cleanUp
函數。在 cleanUp
函數中,要刪除事件偵聽器,因爲我們會多次重用同一個 worker。如果沒有刪除監聽器的話就會發生記憶體漏失,記憶體會被慢慢耗盡。
在 activeWorkersById
狀態中,我們將 [workerId]
屬性設定爲 false
,並檢查佇列是否爲空。如果不是,就從 queue
中刪除第一個專案,並用另一個 queueItem
再次呼叫 worker。
接着建立一個在收到 message
事件中的數據後進行一些計算的 worker:
import { isMainThread, parentPort } from 'worker_threads'; if (isMainThread) { throw new Error('Its not a worker'); } const doCalcs = (data: any) => { const collection = []; for (let i = 0; i < 1000000; i += 1) { collection[i] = Math.round(Math.random() * 100000); } return collection.sort((a, b) => { if (a > b) { return 1; } return -1; }); }; parentPort.on('message', (data: any) => { const result = doCalcs(data); parentPort.postMessage(result); });
worker 建立了一個包含 100 萬個亂數的陣列,然後對它們進行排序。只要能夠多花費一些時間才能 纔能完成,做些什麼事情並不重要。
以下是工作池簡單用法的範例:
const pool = new WorkerPool<{ i: number }, number>(path.join(__dirname, './test-worker.js'), 8); const items = [...new Array(100)].fill(null); Promise.all( items.map(async (_, i) => { await pool.run(() => ({ i })); console.log('finished', i); }), ).then(() => { console.log('finished all'); });
首先建立一個由八個 worker 組成的工作池。然後建立一個包含 100 個元素的陣列,對於每個元素,我們在工作池中執行一個任務。開始執行後將立即執行八個任務,其餘任務被放入佇列並逐個執行。通過使用工作池,我們不必每次都建立一個 worker,從而大大提高了效率。
worker_threads
提供了一種爲程式新增多執行緒支援的簡單的方法。通過將繁重的 CPU 計算委託給其他執行緒,可以顯着提高伺服器的吞吐量。通過官方執行緒支援,我們可以期待更多來自AI、機器學習和大數據等領域的開發人員和工程師使用 Node.js.
英文原文地址:https://blog.logrocket.com/a-complete-guide-to-threads-in-node-js-4fa3898fe74f
相關推薦:
以上就是深入瞭解Node.js多執行緒(指南)的詳細內容,更多請關注php中文網其它相關文章!