Nodejs中什麼是可寫流?怎麼使用

2020-11-23 21:01:39

相關推薦:《》

什麼是可寫流

可寫流是對資料流向裝置的抽象,用來消費上游流過來的資料,通過可寫流程式可以把資料寫入裝置,常見的是本地磁碟檔案或者 TCP、HTTP 等網路響應。

看一個之前用過的例子

process.stdin.pipe(process.stdout);

*process.stdout* 是一個可寫流,程式把可讀流 process.stdin 傳過來的資料寫入的標準輸出裝置。在瞭解了可讀流的基礎上理解可寫流非常簡單,流就是有方向的資料,其中可讀流是資料來源,可寫流是目的地,中間的管道環節是雙向流。

可寫流使用

呼叫可寫流範例的 **write() **方法就可以把資料寫入可寫流

const fs = require('fs');
const rs = fs.createReadStream('./w.js');
const ws = fs.createWriteStream('./copy.js');

rs.setEncoding('utf-8');
rs.on('data', chunk => {
  ws.write(chunk);
});

前面提到過監聽了可讀流的 data 事件就會使可讀流進入流動模式,我們在回撥事件裡呼叫了可寫流的 write() 方法,這樣資料就被寫入了可寫流抽象的裝置中,也就是當前目錄下的 copy.js 檔案。

write() 方法有三個引數

  • chunk {String| Buffer},表示要寫入的資料
  • encoding 當寫入的資料是字串的時候可以設定編碼
  • callback 資料被寫入之後的回撥函數

自定義可寫流

和自定義可讀流類似,簡單的自定義可寫流只需要兩步

  1. 繼承 stream 模組的 Writable
  2. 實現 _write() 方法

我們來實現一個簡單的可寫流,把傳入可寫流的資料轉成大寫之後輸出到標準輸出裝置(比較好的例子可能是寫入本地磁碟檔案,但涉及過多的 fs 操作,比較麻煩,偷個懶。寫入標準輸出裝置也是一種寫入行為)

const Writable = require('stream').Writable

class OutputStream extends Writable {
    _write(chunk, enc, done) {
        // 轉大寫之後寫入標準輸出裝置
        process.stdout.write(chunk.toString().toUpperCase());
        // 此處不嚴謹,應該是監聽寫完之後才呼叫 done
        process.nextTick(done);
    }
}

module.exports = OutputStream;

和最終可寫流暴露出來的 write() 方法一樣, _write() 方法有三個引數,作用類似

  • chunk 寫入的資料,大部分時候是 buffer,除非 decodeStrings 被設定為 false
  • encoding 如果資料是字串,可以設定編碼,buffer 或者 object 模式會忽略
  • callback 資料寫入後的回撥函數,可以通知流傳入下一個資料;當出現錯誤的時候也可以設定一個 error 引數

當然其實還有一個 _writev() 方法可以實現,這個方法僅被滯留的寫入佇列呼叫,可以不實現。

範例化可寫流

有了可寫流的類之後我們可以範例化使用了,範例化可寫流的時候有幾個 option 可選,瞭解一下可以幫助我們理解後面要用的知識

  • objectMode預設是 false, 設定成 true 後 writable.write() 方法除了寫入 string 和 buffer 外,還可以寫入任意 JavaScript 物件。很有用的一個選項,後面介紹 transform 流的時候詳細介紹
  • highWaterMark每次最多寫入的資料量, Buffer 的時候預設值 16kb, objectMode 時預設值 16
  • decodeStrings是否把傳入的資料轉成 Buffer,預設是 true

這樣我們就更清楚的知道 _write() 方法傳入的引數的含義了,而且對後面介紹 back pressure 機制的理解很有幫助。

事件

和可讀流一樣,可寫流也有幾個常用的事件,有了可讀流的基礎,理解起來比較簡單

  • pipe 當可讀流呼叫 pipe() 方法向可寫流傳輸資料的時候會觸發可寫流的 pipe 事件
  • unpipe 當可讀流呼叫 unpipe() 方法移除資料傳遞的時候會觸發可寫流的 unpipe 事件

這兩個事件用於通知可寫流資料將要到來和將要被切斷,在通常情況下使用的很少。

writeable.write() 方法是有一個 bool 的返回值的,前面提到了 highWaterMark,當要求寫入的資料大於可寫流的 highWaterMark 的時候,資料不會被一次寫入,有一部分資料被滯留,這時候 writeable.write() 就會返回 false,如果可以處理完就會返回 true

drain 當之前存在滯留資料,也就是 writeable.write() 返回過 false,經過一段時間的消化,處理完了積壓資料,可以繼續寫入新資料的時候觸發(drain 的本意即為排水、枯竭,挺形象的)

除了 write() 方法可寫流還有一個常用的方法 end(),引數和 write() 方法相同,但也可以不傳入引數,表示沒有其它資料需要寫入,可寫流可以關閉了。

finish 當呼叫 writable.end() 方法,並且所有資料都被寫入底層後會觸發 finish 事件

同樣出現錯誤後會觸發 error 事件

back pressure

瞭解了這些事件,結合上之前提到的可讀流的一些知識,我們就能探討一些有意思的話題了。在最開始我們提到過用流相對於直接操作檔案的好處之一是不會把記憶體壓爆,那麼流是怎麼做到的呢?

最開始我們可能會想到因為流不是一次性把所有資料載入記憶體處理,而是一邊讀一邊寫。但我們知道一般讀取的速度會遠遠快於寫入的速度,那麼 pipe() 方法是怎麼做到供需平衡的呢?

回憶一些基礎知識,我們自己來實現一下 pipe() 方法的核心原理

  1. 可讀流有流動和暫停兩種模式,可以通過 **pause() resume() **方法切換
  2. 可寫流的 **write() **方法會返回是否能處理當前的資料,每次可以處理多少是 hignWatermark 決定的
  3. 當可寫流處理完了積壓資料會觸發 drain 事件

我們可以利用這三點來做到資料讀取和寫入的同步,還是使用之前的例子,但為了使消費速度降下來,我們各一秒再通知完成

class OutputStream extends Writable {
    _write(chunk, enc, done) {
        // 轉大寫之後寫入標準輸出裝置
        process.stdout.write(chunk.toString().toUpperCase());
        // 故意延緩通知繼續傳遞資料的時間,造成寫入速度慢的現象
        setTimeout(done, 1000);
    }
}

我們使用一下自定義的兩個類

const RandomNumberStream = require('./RandomNumberStream');
const OutputStream = require('./OutputStream');

const rns = new RandomNumberStream(100);
const os = new OutputStream({
    highWaterMark: 8 // 把水位降低,預設16k還是挺大的
});

rns.on('data', chunk => {
    // 當待處理佇列大於 highWaterMark 時返回 false
    if (os.write(chunk) === false) { 
        console.log('pause');
        rns.pause(); // 暫停資料讀取
    }
});

// 當待處理佇列小於 highWaterMark 時觸發 drain 事件
os.on('drain', () => {
    console.log('drain')
    rns.resume(); // 恢復資料讀取
});

結合前面的三點和註釋很容易看懂上面程式碼,這就是 pipe() 方法起作用的核心原理。資料的來源的去向我們有了大概瞭解,後面可以開始介紹資料的加工

  • duplex
  • transform

更多程式設計相關知識,請存取:!!

以上就是Nodejs中什麼是可寫流?怎麼使用的詳細內容,更多請關注TW511.COM其它相關文章!