在 JavaScript 中,一般只處理字串層面的資料,但是在 Node.js 中,需要處理網路、檔案等二進位制資料。
由此,引入了Buffer和Stream的概念,兩者都是位元組層面的操作。
Buffer 表示一塊專門存放二進位制資料的緩衝區。Stream 表示流,一種有序、有起點和終點的二進位制傳輸手段。
Stream 會從 Buffer 中讀取資料,像水在管道中流動那樣轉移資料。
本系列所有的範例原始碼都已上傳至Github,點選此處獲取。
Buffer 是 JavaScript 中的 Uint8Array 的子類,Uint8Array 是一種型別化陣列,處理 8 位無符號整數。
其行為類似於陣列(有 length 屬性,可迭代等),但並不是真正的陣列,其元素是 16 進位制的兩位數。
Buffer 在建立時就會確定佔用記憶體的大小,之後就無法再調整,並且它會被分配一塊 V8 堆疊外的原始記憶體。
Buffer 的應用場景比較多,例如在zlib模組中,利用 Buffer 來操作二進位制資料實現資源壓縮的功能;在crypto模組的一些加密演演算法,也會使用 Buffer。
1)建立
在 Node 版本 <= 6 時,建立 Buffer 範例是 通過建構函式建立的:new Buffer(),但後面的版本就廢棄了。
現在常用的建立方法有:
2)編碼
在建立一個 Buffer 範例後,就可以像陣列那樣存取某個字元,而列印出的值是數位,如下所示,這些數位是 Unicode 碼。
let buf = Buffer.from('strick') console.log(buf[0]); // 115 console.log(buf[1]); // 116
若在建立時包含中文字元,那麼就會多 3 個 16 進位制的兩位數,如下所示。
let buf = Buffer.from('strick') console.log(buf); // <Buffer 73 74 72 69 63 6b> buf = Buffer.from('strick平') console.log(buf); // <Buffer 73 74 72 69 63 6b e5 b9 b3>
Buffer.from() 的第二個引數是編碼,預設值是 utf8,而 1 箇中文字元經過 UTF-8 編碼後通常會佔用 3 個位元組,1 個英文字元只佔用 1 個位元組。
在呼叫 toString() 方法後就能根據指定編碼(不傳預設是 UTF-8)將 Buffer 解碼為字串。
console.log(buf.toString()); // strick平
Node.js 支援的其他編碼包括 latin1、base64、ascii 等,具體可參考官方檔案。
3)記憶體分配原理
Node.js 記憶體分配都是在 C++ 層面完成的,採用 Slab 分配器(Linux 中有廣泛應用)動態分配記憶體,並且以 8KB 為界限來區分是小物件還是大物件(參考自深入淺出Node.js)。
可以簡單看下Buffer.from()的原始碼,當它的引數是字串時,其內部會呼叫 fromStringFast() 函數(在src/lib/buffer.js中),然後根據位元組長度分別處理。
如果當前所佔記憶體不夠,那麼就會呼叫 createPool() 擴容,通過呼叫 createUnsafeBuffer() 建立 Buffer,其中 FastBuffer 繼承自 Uint8Array。
// 以 8KB 為界限 Buffer.poolSize = 8 * 1024; // Buffer.from() 內會呼叫此函數 function fromStringFast(string, ops) { const length = ops.byteLength(string); // 長度大於 4KB(>>> 表示無符號右移 1 位) if (length >= (Buffer.poolSize >>> 1)) return createFromString(string, ops.encodingVal); // 當前所佔記憶體不夠(poolOffset 記錄已經使用的位元組數) if (length > (poolSize - poolOffset)) createPool(); let b = new FastBuffer(allocPool, poolOffset, length); const actual = ops.write(b, string, 0, length); if (actual !== length) { // byteLength() may overestimate. That's a rare case, though. b = new FastBuffer(allocPool, poolOffset, actual); } poolOffset += actual; alignPool(); return b; } // 初始化一個 8 KB 的記憶體空間 function createPool() { poolSize = Buffer.poolSize; allocPool = createUnsafeBuffer(poolSize).buffer; markAsUntransferable(allocPool); poolOffset = 0; } // 建立 Buffer function createUnsafeBuffer(size) { zeroFill[0] = 0; try { return new FastBuffer(size); } finally { zeroFill[0] = 1; } } // FastBuffer 繼承自 Uint8Array class FastBuffer extends Uint8Array {}
流(Stream)的概念最早見於 Unix 系統,是一種已被證實有效的程式設計方式。
Node.js 內建的流模組會被其他多個核心模組所依賴,它具有可讀、可寫或可讀寫的特點,並且所有的流都是 EventEmitter 的範例,也就是說被賦予了非同步的能力。
官方總結了流的兩個優點,分別是:
1)流型別
流的基本型別有4種:
來看一個官方的 Readable 流範例,先是用 fs.readFile() 直接將整個檔案讀到記憶體中。當檔案很大或並行量很高時,將消耗大量的記憶體。
const http = require('http') const fs = require('fs') http.createServer(function(req, res) { fs.readFile(__dirname + '/data.txt', (err, data) => { res.end(data) }) }).listen(1234)
再用 fs.createReadStream() 方法通過流的方式來讀取檔案,其中 req 和 res 兩個引數也是流物件。
data.txt 檔案中的內容將會一段段的傳輸給 HTTP 使用者端,而不是等到讀取完了再一次性響應,兩者對比,高下立判。
http.createServer((req, res) => { const readable = fs.createReadStream(__dirname + '/data.txt') readable.pipe(res); }).listen(1234)
2)pipe()
在上面的範例中,pipe() 方法的作用是將一個可讀流 readable 變數中的資料傳輸到一個可寫流 res 變數(也叫目標流)中。
pipe() 方法地主要目的是平衡讀取和寫入的速度,讓資料的流動達到一個可接受的水平,防止因為讀寫速度的差異,而導致記憶體被佔滿。
在 pipe() 函數內部會監聽可讀流的 data 事件,並且會自動呼叫可寫流的 end() 方法。
當內部緩衝大於設定的最高水位線(highWaterMark)時,也就是讀取速度大於寫入速度時,為了避免產生背壓問題,Node.js 就會停止資料流動。
當再次重啟流動時,會觸發 drain 事件,其具體實現可參考此文。
pipe() 方法會返回目標流,雖然支援鏈式呼叫,但必須是 Duplex 或 Transform 流,否則會報錯,如下所示。
http.createServer((req, res) => { const readable = fs.createReadStream(__dirname + '/data.txt') const writable = fs.createWriteStream(__dirname + '/tmp.txt') // Error [ERR_STREAM_CANNOT_PIPE]: Cannot pipe, not readable readable.pipe(writable).pipe(res); }).listen(1234)
3)end()
很多時候寫入流是不需要手動呼叫 end() 方法來關閉的。但如果在讀取期間發生錯誤,那就不能關閉寫入流,發生記憶體漏失。
為了防止這種情況發生,可監聽可讀流的錯誤事件,手動關閉,如下所示。
readable.on('error', function(err) { writeable.close(); });
接下來看一種網路場景,改造一下之前的範例,讓可讀流監聽 data、end 和 error 事件,當讀取完畢或出現錯誤時關閉可寫流。
http.createServer((req, res) => { const readable = fs.createReadStream(__dirname + '/data.txt') readable.on('data', chunk => { res.write(chunk); }); readable.on('end',() => { res.end(); }) readable.on('error', err => { res.end('File not found'); }); }).listen(1234)
若不手動關閉,那麼頁面將一直處於載入中,在KOA原始碼中,多處呼叫了此方法。
注意,若取消對 data 事件的監聽,那麼頁面也會一直處於載入中,因為流一開始是靜止的,只有在註冊 data 事件後才會開始活動。
4)大JSON檔案
網上看到的一道題,用 Node.js 處理一個很大的 JSON 檔案,並且要讀取到 JSON 檔案的某個欄位。
直接用 fs.readFile() 或 require() 讀取都會佔用很大的記憶體,甚至超出電腦記憶體。
直接用 fs.createReadStream() 也不行,讀到的資料不能格式化成 JSON 物件,難以讀取欄位。
CNode論壇上對此問題也做過專門的討論。
藉助開源庫JSONStream可以實現要求,它基於jsonparse,這是一個流式 JSON 解析器。
JSONStream 的原始碼去掉註釋和空行差不多 200 行左右,在此就不展開分析了。
參考資料:
Node.js 語法基礎 —— Buffter & Stream