當你在鍵盤上輸入字元,從磁碟讀取檔案或在網上下載檔案時,一股資訊流(bits)在流經不同的裝置和應用。
如果你學會處理這些位元組流,你將能構建高效能且有價值的應用。例如,試想一下當你在 YouTube 觀看視訊時,你不需要一直等待直到完整的視訊下載完。一旦有一個小緩衝,視訊就會開始播放,而剩下的會在你觀看時繼續下載。
包含一個內建模組 stream
可以讓我們處理流資料。在這篇文章中,我們將通過幾個簡單的範例來講解 stream
的用法,我們也會描述在面對複雜案例構建高效能應用時,應該如何構建管道去合併不同的流。
在我們深入理解應用構建前,理解 Node.js stream
模組提供的特性很重要。
讓我們開始吧!
Node.js stream
提供了四種型別的流
更多詳情請檢視 Node.js 官方檔案
https://nodejs.org/api/stream.html#stream_types_of_streams
讓我們在高層面來看看每一種流型別吧。
可讀流可以從一個特定的資料來源中讀取資料,最常見的是從一個檔案系統中讀取。Node.js 應用中其他常見的可讀流用法有:
process.stdin
-通過 stdin
在終端應用中讀取使用者輸入。http.IncomingMessage
- 在 HTTP 服務中讀取傳入的請求內容或者在 HTTP 使用者端中讀取伺服器的 HTTP 響應。你可以使用可寫流將來自應用的資料寫入到特定的地方,比如一個檔案。
process.stdout
可以用來將資料寫成標準輸出且被 console.log
內部使用。
接下來是雙工流和轉換流,可以被定義為基於可讀流和可寫流的混合流型別。
雙工流是可讀流和可寫流的結合,它既可以將資料寫入到特定的地方也可以從資料來源讀取資料。最常見的雙工流案例是 net.Socket
,它被用來從 socket 讀寫資料。
有一點很重要,雙工流中的可讀端和可寫端的操作是相互獨立的,資料不會從一端流向另一端。
轉換流與雙工流略有相似,但在轉換流中,可讀端和可寫端是相關聯的。
crypto.Cipher
類是一個很好的例子,它實現了加密流。通過 crypto.Cipher
流,應用可以往流的可寫端寫入純文字資料並從流的可讀端讀取加密後的密文。之所以將這種型別的流稱之為轉換流就是因為其轉換性質。
附註:另一個轉換流是 stream.PassThrough
。stream.PassThrough
從可寫端傳遞資料到可讀端,沒有任何轉換。這聽起來可能有點多餘,但 Passthrough 流對構建自定義流以及流管道非常有幫助。(比如建立一個流的資料的多個副本)
一旦可讀流連線到生產資料的源頭,比如一個檔案,就可以用幾種方法通過該流讀取資料。
首先,先建立一個名為 myfile
的簡單的 text 檔案,85 位元組大小,包含以下字串:
Lorem ipsum dolor sit amet, consectetur adipiscing elit. Curabitur nec mauris turpis.
現在,我們看下從可讀流讀取資料的兩種不同方式。
data
事件從可讀流讀取資料的最常見方式是監聽流發出的 data
事件。以下程式碼演示了這種方式:
const fs = require('fs') const readable = fs.createReadStream('./myfile', { highWaterMark: 20 }); readable.on('data', (chunk) => { console.log(`Read ${chunk.length} bytes\n"${chunk.toString()}"\n`); })
highWaterMark
屬性作為一個選項傳遞給 fs.createReadStream
,用於決定該流中有多少資料緩衝。然後資料被衝到讀取機制(在這個案例中,是我們的 data
處理程式)。預設情況下,可讀 fs
流的 highWaterMark
值是 64kb。我們刻意重寫該值為 20 位元組用於觸發多個 data
事件。
如果你執行上述程式,它會在五個迭代內從 myfile
中讀取 85 個位元組。你會在 console 看到以下輸出:
Read 20 bytes "Lorem ipsum dolor si" Read 20 bytes "t amet, consectetur " Read 20 bytes "adipiscing elit. Cur" Read 20 bytes "abitur nec mauris tu" Read 5 bytes "rpis."
從可讀流中讀取資料的另一種方法是使用非同步迭代器:
const fs = require('fs') const readable = fs.createReadStream('./myfile', { highWaterMark: 20 }); (async () => { for await (const chunk of readable) { console.log(`Read ${chunk.length} bytes\n"${chunk.toString()}"\n`); } })()
如果你執行這個程式,你會得到和前面例子一樣的輸出。
當一個監聽器監聽到可讀流的 data
事件時,流的狀態會切換成」流動」狀態(除非該流被顯式的暫停了)。你可以通過流物件的 readableFlowing
屬性檢查流的」流動」狀態
我們可以稍微修改下前面的例子,通過 data
處理器來示範:
const fs = require('fs') const readable = fs.createReadStream('./myfile', { highWaterMark: 20 }); let bytesRead = 0 console.log(`before attaching 'data' handler. is flowing: ${readable.readableFlowing}`); readable.on('data', (chunk) => { console.log(`Read ${chunk.length} bytes`); bytesRead += chunk.length // 在從可讀流中讀取 60 個位元組後停止閱讀 if (bytesRead === 60) { readable.pause() console.log(`after pause() call. is flowing: ${readable.readableFlowing}`); // 在等待 1 秒後繼續讀取 setTimeout(() => { readable.resume() console.log(`after resume() call. is flowing: ${readable.readableFlowing}`); }, 1000) } }) console.log(`after attaching 'data' handler. is flowing: ${readable.readableFlowing}`);
在這個例子中,我們從一個可讀流中讀取 myfile
,但在讀取 60 個位元組後,我們臨時暫停了資料流 1 秒。我們也在不同的時間列印了 readableFlowing
屬性的值去理解他是如何變化的。
如果你執行上述程式,你會得到以下輸出:
before attaching 'data' handler. is flowing: null after attaching 'data' handler. is flowing: true Read 20 bytes Read 20 bytes Read 20 bytes after pause() call. is flowing: false after resume() call. is flowing: true Read 20 bytes Read 5 bytes
我們可以用以下來解釋輸出:
當我們的程式開始時,readableFlowing
的值是 null
,因為我們沒有提供任何消耗流的機制。
在連線到 data
處理器後,可讀流變為「流動」模式,readableFlowing
變為 true
。
一旦讀取 60 個位元組,通過呼叫 pause()
來暫停流,readableFlowing
也轉變為 false
。
在等待 1 秒後,通過呼叫 resume()
,流再次切換為「流動」模式,readableFlowing
改為 `true'。然後剩下的檔案內容在流中流動。
因為有流,應用不需要在記憶體中保留大型的二進位制物件:小型的資料塊可以接收到就進行處理。
在這部分,讓我們組合不同的流來構建一個可以處理大量資料的真實應用。我們會使用一個小型的工具程式來生成一個給定檔案的 SHA-256。
但首先,我們需要建立一個大型的 4GB 的假檔案來測試。你可以通過一個簡單的 shell 命令來完成:
mkfile -n 4g 4gb_file
xfs_mkfile 4096m 4gb_file
在我們建立了假檔案 4gb_file
後,讓我們在不使用 stream
模組的情況下來生成來檔案的 SHA-256 hash。
const fs = require("fs"); const crypto = require("crypto"); fs.readFile("./4gb_file", (readErr, data) => { if (readErr) return console.log(readErr) const hash = crypto.createHash("sha256").update(data).digest("base64"); fs.writeFile("./checksum.txt", hash, (writeErr) => { writeErr && console.error(err) }); });
如果你執行以上程式碼,你可能會得到以下錯誤:
RangeError [ERR_FS_FILE_TOO_LARGE]: File size (4294967296) is greater than 2 GB at FSReqCallback.readFileAfterStat [as oncomplete] (fs.js:294:11) { code: 'ERR_FS_FILE_TOO_LARGE' }
以上報錯之所以發生是因為 JavaScript 執行時無法處理隨機的大型緩衝。執行時可以處理的最大尺寸的緩衝取決於你的作業系統結構。你可以通過使用內建的 buffer
模組裡的 buffer.constants.MAX_LENGTH
變數來檢視你作業系統快取的最大尺寸。
即使上述報錯沒有發生,在記憶體中保留大型檔案也是有問題的。我們所擁有的可用的實體記憶體會限制我們應用能使用的記憶體量。高記憶體使用率也會造成應用在 CPU 使用方面效能低下,因為垃圾回收會變得昂貴。
pipeline()
減少 APP 的記憶體佔用現在,讓我們看看如何修改應用去使用流且避免遇到這個報錯:
const fs = require("fs"); const crypto = require("crypto"); const { pipeline } = require("stream"); const hashStream = crypto.createHash("sha256"); hashStream.setEncoding('base64') const inputStream = fs.createReadStream("./4gb_file"); const outputStream = fs.createWriteStream("./checksum.txt"); pipeline( inputStream, hashStream, outputStream, (err) => { err && console.error(err) } )
在這個例子中,我們使用 crypto.createHash
函數提供的流式方法。它返回一個「轉換」流物件 hashStream
,為隨機的大型檔案生成 hash。
為了將檔案內容傳輸到這個轉換流中,我們使用 fs.createReadStream
為 4gb_file
建立了一個可讀流 inputStream
。我們將 hashStream
轉換流的輸出傳遞到可寫流 outputStream
中,而 checksum.txt
通過 fs.createWriteStream
建立的。
如果你執行以上程式,你將看見在 checksum.txt
檔案中看見 4GB 檔案的 SHA-256 hash。
pipeline()
和 pipe()
的對比在前面的案例中,我們使用 pipeline
函數來連線多個流。另一種常見的方法是使用 .pipe()
函數,如下所示:
inputStream .pipe(hashStream) .pipe(outputStream)
但這裡有幾個原因,所以並不推薦在生產應用中使用 .pipe()
。如果其中一個流被關閉或者出現報錯,pipe()
不會自動銷燬連線的流,這會導致應用記憶體洩露。同樣的,pipe()
不會自動跨流轉發錯誤到一個地方處理。
因為這些問題,所以就有了 pipeline()
,所以推薦你使用 pipeline()
而不是 pipe()
來連線不同的流。 我們可以重寫上述的 pipe()
例子來使用 pipeline()
函數,如下:
pipeline( inputStream, hashStream, outputStream, (err) => { err && console.error(err) } )
pipeline()
接受一個回撥函數作為最後一個引數。任何來自被連線的流的報錯都將觸發該回撥函數,所以可以很輕鬆的在一個地方處理報錯。
在 Node.js 中使用流有助於我們構建可以處理大型資料的高效能應用。
在這篇文章中,我們覆蓋了:
data
事件或者使用非同步迭代器來從可讀流中讀取資料。pipeline
連線多個流來減少記憶體佔用。一個簡短的警告:你很可能不會遇到太多必須使用流的場景,而基於流的方案會提高你的應用的複雜性。務必確保使用流的好處勝於它所帶來的複雜性。
更多node相關知識,請存取:!
以上就是聊聊Node.js stream 模組,看看如何構建高效能的應用的詳細內容,更多請關注TW511.COM其它相關文章!