Node.js精進(3)——流

2022-06-21 09:00:53

  在 JavaScript 中,一般只處理字串層面的資料,但是在 Node.js 中,需要處理網路、檔案等二進位制資料。

  由此,引入了BufferStream的概念,兩者都是位元組層面的操作。

  Buffer 表示一塊專門存放二進位制資料的緩衝區。Stream 表示流,一種有序、有起點和終點的二進位制傳輸手段。

  Stream 會從 Buffer 中讀取資料,像水在管道中流動那樣轉移資料。

  本系列所有的範例原始碼都已上傳至Github,點選此處獲取。

一、Buffer

  Buffer 是 JavaScript 中的 Uint8Array 的子類,Uint8Array 是一種型別化陣列,處理 8 位無符號整數。

  其行為類似於陣列(有 length 屬性,可迭代等),但並不是真正的陣列,其元素是 16 進位制的兩位數。

  Buffer 在建立時就會確定佔用記憶體的大小,之後就無法再調整,並且它會被分配一塊 V8 堆疊外的原始記憶體。

  Buffer 的應用場景比較多,例如在zlib模組中,利用 Buffer 來操作二進位制資料實現資源壓縮的功能;在crypto模組的一些加密演演算法,也會使用 Buffer。

1)建立

  在 Node 版本 <= 6 時,建立 Buffer 範例是 通過建構函式建立的:new Buffer(),但後面的版本就廢棄了。

  現在常用的建立方法有:

  • Buffer.from() :傳入已有資料,轉換成一個 Buffer 範例,資料可以是字串、物件、陣列等。
  • Buffer.alloc():分配指定位元組數量的 Buffer 範例。
  • Buffer.allocUnsafe() :功能與 Buffer.alloc() 相同,但其所佔記憶體中的舊資料不會被清除,可能會洩漏敏感資料。

2)編碼

  在建立一個 Buffer 範例後,就可以像陣列那樣存取某個字元,而列印出的值是數位,如下所示,這些數位是 Unicode 碼。

let buf = Buffer.from('strick')
console.log(buf[0]);    // 115
console.log(buf[1]);    // 116

  若在建立時包含中文字元,那麼就會多 3 個 16 進位制的兩位數,如下所示。

let buf = Buffer.from('strick')
console.log(buf);   // <Buffer 73 74 72 69 63 6b>
buf = Buffer.from('strick平')
console.log(buf);   // <Buffer 73 74 72 69 63 6b e5 b9 b3>

  Buffer.from() 的第二個引數是編碼,預設值是 utf8,而 1 箇中文字元經過 UTF-8 編碼後通常會佔用 3 個位元組,1 個英文字元只佔用 1 個位元組。

  在呼叫 toString() 方法後就能根據指定編碼(不傳預設是 UTF-8)將 Buffer 解碼為字串。

console.log(buf.toString());    // strick平

  Node.js 支援的其他編碼包括 latin1、base64、ascii 等,具體可參考官方檔案

3)記憶體分配原理

  Node.js 記憶體分配都是在 C++ 層面完成的,採用 Slab 分配器(Linux 中有廣泛應用)動態分配記憶體,並且以 8KB 為界限來區分是小物件還是大物件(參考自深入淺出Node.js)。

  可以簡單看下Buffer.from()的原始碼,當它的引數是字串時,其內部會呼叫 fromStringFast() 函數(在src/lib/buffer.js中),然後根據位元組長度分別處理。

  如果當前所佔記憶體不夠,那麼就會呼叫 createPool() 擴容,通過呼叫 createUnsafeBuffer() 建立 Buffer,其中 FastBuffer 繼承自 Uint8Array。

// 以 8KB 為界限
Buffer.poolSize = 8 * 1024;
// Buffer.from() 內會呼叫此函數
function fromStringFast(string, ops) {
  const length = ops.byteLength(string);
  // 長度大於 4KB(>>> 表示無符號右移 1 位)
  if (length >= (Buffer.poolSize >>> 1))
    return createFromString(string, ops.encodingVal);
  // 當前所佔記憶體不夠(poolOffset 記錄已經使用的位元組數)
  if (length > (poolSize - poolOffset))
    createPool();
  let b = new FastBuffer(allocPool, poolOffset, length);
  const actual = ops.write(b, string, 0, length);
  if (actual !== length) {
    // byteLength() may overestimate. That's a rare case, though.
    b = new FastBuffer(allocPool, poolOffset, actual);
  }
  poolOffset += actual;
  alignPool();
  return b;
}
// 初始化一個 8 KB 的記憶體空間
function createPool() {
  poolSize = Buffer.poolSize;
  allocPool = createUnsafeBuffer(poolSize).buffer;
  markAsUntransferable(allocPool);
  poolOffset = 0;
}
// 建立 Buffer
function createUnsafeBuffer(size) {
  zeroFill[0] = 0;
  try {
    return new FastBuffer(size);
  } finally {
    zeroFill[0] = 1;
  }
}
// FastBuffer 繼承自 Uint8Array
class FastBuffer extends Uint8Array {}

二、流

  流(Stream)的概念最早見於 Unix 系統,是一種已被證實有效的程式設計方式。

  Node.js 內建的流模組會被其他多個核心模組所依賴,它具有可讀、可寫或可讀寫的特點,並且所有的流都是 EventEmitter 的範例,也就是說被賦予了非同步的能力。

  官方總結了流的兩個優點,分別是:

  • 記憶體效率: 無需載入大量的資料到記憶體中即可進行處理。
  • 時間效率: 當獲得資料之後就能立即開始處理資料,而不必等到整個資料載入完,這樣消耗的時間就變少了。

1)流型別

  流的基本型別有4種:

  • Readable:只能讀取資料的流,例如 fs.createReadStream(),可註冊的事件包括 data、end、error、close等。
  • Writable:只能寫入資料的流,例如 fs.createWriteStream(),HTTP 的請求和響應,可註冊的事件包括 drain、error、finish、pipe 等。
  • Duplex:Readable 和 Writable 都支援的全雙工流,例如 net.Socket,這種流會維持兩個緩衝區,分別對應讀取和寫入,允許兩邊同時獨立操作。
  • Transform:在寫入和讀取資料時修改或轉換資料的 Duplex 流,例如 zlib.createDeflate()。

  來看一個官方的 Readable 流範例,先是用 fs.readFile() 直接將整個檔案讀到記憶體中。當檔案很大或並行量很高時,將消耗大量的記憶體。

const http = require('http')
const fs = require('fs')

http.createServer(function(req, res) {
  fs.readFile(__dirname + '/data.txt', (err, data) => {
    res.end(data)
  })
}).listen(1234)

  再用 fs.createReadStream() 方法通過流的方式來讀取檔案,其中 req 和 res 兩個引數也是流物件。

  data.txt 檔案中的內容將會一段段的傳輸給 HTTP 使用者端,而不是等到讀取完了再一次性響應,兩者對比,高下立判。

http.createServer((req, res) => {
  const readable = fs.createReadStream(__dirname + '/data.txt')
  readable.pipe(res);
}).listen(1234)

2)pipe()

  在上面的範例中,pipe() 方法的作用是將一個可讀流 readable 變數中的資料傳輸到一個可寫流 res 變數(也叫目標流)中。

  pipe() 方法地主要目的是平衡讀取和寫入的速度,讓資料的流動達到一個可接受的水平,防止因為讀寫速度的差異,而導致記憶體被佔滿。

  在 pipe() 函數內部會監聽可讀流的 data 事件,並且會自動呼叫可寫流的 end() 方法。

  當內部緩衝大於設定的最高水位線(highWaterMark)時,也就是讀取速度大於寫入速度時,為了避免產生背壓問題,Node.js 就會停止資料流動。

  當再次重啟流動時,會觸發 drain 事件,其具體實現可參考此文

  pipe() 方法會返回目標流,雖然支援鏈式呼叫,但必須是 Duplex 或 Transform 流,否則會報錯,如下所示。

http.createServer((req, res) => {
  const readable = fs.createReadStream(__dirname + '/data.txt')
  const writable = fs.createWriteStream(__dirname + '/tmp.txt')
  // Error [ERR_STREAM_CANNOT_PIPE]: Cannot pipe, not readable
  readable.pipe(writable).pipe(res);
}).listen(1234)

3)end()

  很多時候寫入流是不需要手動呼叫 end() 方法來關閉的。但如果在讀取期間發生錯誤,那就不能關閉寫入流,發生記憶體漏失。

  為了防止這種情況發生,可監聽可讀流的錯誤事件,手動關閉,如下所示。

readable.on('error', function(err) {
  writeable.close();
});

  接下來看一種網路場景,改造一下之前的範例,讓可讀流監聽 data、end 和 error 事件,當讀取完畢或出現錯誤時關閉可寫流。

http.createServer((req, res) => {
  const readable = fs.createReadStream(__dirname + '/data.txt')
  readable.on('data', chunk => {
    res.write(chunk);
  });
  readable.on('end',() => {
    res.end();
  })
  readable.on('error', err => {
    res.end('File not found');
  });
}).listen(1234)

  若不手動關閉,那麼頁面將一直處於載入中,在KOA原始碼中,多處呼叫了此方法。

  注意,若取消對 data 事件的監聽,那麼頁面也會一直處於載入中,因為流一開始是靜止的,只有在註冊 data 事件後才會開始活動。

4)大JSON檔案

  網上看到的一道題,用 Node.js 處理一個很大的 JSON 檔案,並且要讀取到 JSON 檔案的某個欄位。

  直接用 fs.readFile() 或 require() 讀取都會佔用很大的記憶體,甚至超出電腦記憶體。

  直接用 fs.createReadStream() 也不行,讀到的資料不能格式化成 JSON 物件,難以讀取欄位。

  CNode論壇上對此問題也做過專門的討論

  藉助開源庫JSONStream可以實現要求,它基於jsonparse,這是一個流式 JSON 解析器。

  JSONStream 的原始碼去掉註釋和空行差不多 200 行左右,在此就不展開分析了。

 

 

參考資料:

緩衝區 Stream多檔案合併 pipe 

legacy.js模組實現分析 Stream兩種模式 

Stream背壓

深入理解Node.js之Buffer 

Node.js Buffer Node.js 流

Node.js 語法基礎 —— Buffter & Stream

node原始碼分析

通過原始碼解析 Node.js 中導流(pipe)的實現