node中的stream(流)有幾種型別

2022-07-12 22:00:30

node stream有4種型別:1、Readable(可讀流)。需要實現「_read」方法來返回內容;2、Writable(可寫流),需要實現「_write」方法來接受內容;3、Duplex(可讀可寫流),需要實現「_read」和「_write」方法來接受和返回內容;4、Transform(轉換流),需要實現「_transform」方法來把接受的內容轉換之後返回內容。

本教學操作環境:windows7系統、nodejs16版,DELL G3電腦。

流(Stream)在 Nodejs 中是個十分基礎的概念,很多基礎模組都是基於流實現的,扮演著十分重要的角色。同時流也是是一個十分難以理解的概念,這主要是相關的檔案比較缺少,對於 NodeJs 初學者來說,理解流往往需要花很多時間理解,才能真正掌握這個概念,所幸的是,對於大部分 NodeJs 使用者來說,僅僅是用來開發 Web 應用,對流的不充分認識並不影響使用。但是,理解流能夠對 NodeJs 中的其他模組有更好的理解,同時在某些情況下,使用流來處理資料會有更好的效果。

Stream

Stream 是在 Node.js 中處理流資料的抽象介面。Stream 並不是一個實際的介面,而是對所有流的一種統稱。實際的介面有 ReadableStream、 WritableStream、ReadWriteStream 這幾個。

interface ReadableStream extends EventEmitter {
    readable: boolean;
    read(size?: number): string | Buffer;
    setEncoding(encoding: BufferEncoding): this;
    pause(): this;
    resume(): this;
    isPaused(): boolean;
    pipe<T extends WritableStream>(destination: T, options?: { end?: boolean | undefined; }): T;
    unpipe(destination?: WritableStream): this;
    unshift(chunk: string | Uint8Array, encoding?: BufferEncoding): void;
    wrap(oldStream: ReadableStream): this;
    [Symbol.asyncIterator](): AsyncIterableIterator<string | Buffer>;
}

interface WritableStream extends EventEmitter {
    writable: boolean;
    write(buffer: Uint8Array | string, cb?: (err?: Error | null) => void): boolean;
    write(str: string, encoding?: BufferEncoding, cb?: (err?: Error | null) => void): boolean;
    end(cb?: () => void): this;
    end(data: string | Uint8Array, cb?: () => void): this;
    end(str: string, encoding?: BufferEncoding, cb?: () => void): this;
}

interface ReadWriteStream extends ReadableStream, WritableStream { }

可以看出 ReadableStream 和 WritableStream 都是繼承 EventEmitter 類的介面(ts中介面是可以繼承類的,因為他們只是在進行型別的合併)。

上面這些介面對應的實現類分別是 Readable、Writable 和 Duplex

NodeJs中的流有4種:

  • Readable 可讀流(實現ReadableStream)

  • Writable 可寫流(實現WritableStream)

  • Duplex 可讀可寫流(繼承Readable後實現WritableStream)

  • Transform 轉換流(繼承Duplex)

它們都有要實現的方法:

  • Readable 需要實現 _read 方法來返回內容

  • Writable 需要實現 _write 方法來接受內容

  • Duplex 需要實現 _read 和 _write 方法來接受和返回內容

  • Transform 需要實現 _transform 方法來把接受的內容轉換之後返回

Readable

可讀流(Readable)是流的一種型別,他有兩種模式三種狀態

兩種讀取模式:

  • 流動模式:資料會從底層系統讀取寫入到緩衝區,當緩衝區被寫滿後自動通過 EventEmitter 儘快的將資料傳遞給所註冊的事件處理程式中

  • 暫停模式:在這種模式下將不會主動觸發 EventEmitter 傳輸資料,必須顯示的呼叫 Readable.read() 方法來從緩衝區中讀取資料,read 會觸發響應到 EventEmitter 事件。

三種狀態:

  • readableFlowing === null(初始狀態)

  • readableFlowing === false(暫停模式)

  • readableFlowing === true(流動模式)

初始時流的 readable.readableFlowingnull

新增data事件後變為 true 。呼叫 pause()unpipe()、或接收到背壓或者新增 readable 事件,則 readableFlowing 會被設為 false ,在這個狀態下,為 data 事件繫結監聽器不會使 readableFlowing 切換到 true

呼叫 resume() 可以讓可讀流的 readableFlowing 切換到 true

移除所有的 readable 事件是使 readableFlowing 變為 null 的唯一方法。

事件名說明
readable當緩衝區有新的可讀取資料時觸發(每一個想快取池插入節點都會觸發)
data每一次消費資料後都會觸發,引數是本次消費的資料
close流關閉時觸發
error流發生錯誤時觸發
方法名說明
read(size)消費長度為size的資料,返回null表示當前資料不足size,否則返回本次消費的資料。size不傳遞時表示消費快取池中所有資料
const fs = require('fs');

const readStreams = fs.createReadStream('./EventEmitter.js', {
    highWaterMark: 100// 快取池浮標值
})

readStreams.on('readable', () => {
    console.log('緩衝區滿了')
    readStreams.read()// 消費快取池的所有資料,返回結果並且觸發data事件
})


readStreams.on('data', (data) => {
    console.log('data')
})

https://github1s.com/nodejs/node/blob/v16.14.0/lib/internal/streams/readable.js#L527

當 size 為 0 會觸發 readable 事件。

當快取池中的資料長度達到浮標值 highWaterMark 後,就不會在主動請求生產資料,而是等待資料被消費後在生產資料

暫停狀態的流如果不呼叫 read 來消費資料時,後續也不會在觸發 datareadable,當呼叫 read 消費時會先判斷本次消費後剩餘的資料長度是否低於 浮標值,如果低於 浮標值 就會在消費前請求生產資料。這樣在 read 後的邏輯執行完成後新的資料大概率也已經生產完成,然後再次觸發 readable,這種提前生產下一次消費的資料存放在快取池的機制也是快取流為什麼快的原因

流動狀態下的流有兩種情況

  • 生產速度慢於消費速度時:這種情況下每一個生產資料後一般快取池中都不會有剩餘資料,直接將本次生產的資料傳遞給 data 事件即可(因為沒有進入快取池,所以也不用呼叫 read 來消費),然後立即開始生產新資料,待上一次資料消費完後新資料才生產好,再次觸發 data ,一隻到流結束。
  • 生產速度快於消費速度時:此時每一次生產完資料後一般快取池都還存在未消費的資料,這種情況一般會在消費資料時開始生產下一次消費的資料,待舊資料消費完後新資料已經生產完並且放入快取池

他們的區別僅僅在於資料生產後快取池是否還存在資料,如果存在資料則將生產的資料 push 到快取池等待消費,如果不存在則直接將資料交給 data 而不加入快取池。

值得注意的是當一個快取池中存在資料的流從暫停模式進入的流動模式時,會先回圈呼叫 read 來消費資料只到返回 null

暫停模式

1.png

暫停模式下,一個可讀流讀建立時,模式是暫停模式,建立後會自動呼叫 _read 方法,把資料從資料來源 push 到緩衝池中,直到緩衝池中的資料達到了浮標值。每當資料到達浮標值時,可讀流會觸發一個 " readable " 事件,告訴消費者有資料已經準備好了,可以繼續消費。

一般來說, 'readable' 事件表明流有新的動態:要麼有新的資料,要麼到達流的盡頭。所以,資料來源的資料被讀完前,也會觸發一次 'readable' 事件;

消費者 " readable " 事件的處理常式中,通過 stream.read(size) 主動消費緩衝池中的資料。

const { Readable } = require('stream')

let count = 1000
const myReadable = new Readable({
    highWaterMark: 300,
    // 引數的 read 方法會作為流的 _read 方法,用於獲取源資料
    read(size) {
        // 假設我們的源資料上 1000 個1
        let chunk = null
        // 讀取資料的過程一般是非同步的,例如IO操作
        setTimeout(() => {
            if (count > 0) {
                let chunkLength = Math.min(count, size)
                chunk = '1'.repeat(chunkLength)
                count -= chunkLength
            }
            this.push(chunk)
        }, 500)
    }
})
// 每一次成功 push 資料到快取池後都會觸發 readable
myReadable.on('readable', () => {
    const chunk = myReadable.read()//消費當前快取池中所有資料
    console.log(chunk.toString())
})

值得注意的是, 如果 read(size) 的 size 大於浮標值,會重新計算新的浮標值,新浮標值是size的下一個二次冪(size <= 2^n,n取最小值)

//  hwm 不會大於 1GB.
const MAX_HWM = 0x40000000;
function computeNewHighWaterMark(n) {
  if (n >= MAX_HWM) {
    // 1GB限制
    n = MAX_HWM;
  } else {
    //取下一個2最高冪,以防止過度增加hwm
    n--;
    n |= n >>> 1;
    n |= n >>> 2;
    n |= n >>> 4;
    n |= n >>> 8;
    n |= n >>> 16;
    n++;
  }
  return n;
}

流動模式

2.png

所有可讀流開始的時候都是暫停模式,可以通過以下方法可以切換至流動模式:

  • 新增 " data " 事件控制程式碼;
  • 呼叫 「 resume 」方法;
  • 使用 " pipe " 方法把資料傳送到可寫流

流動模式下,緩衝池裡面的資料會自動輸出到消費端進行消費,同時,每次輸出資料後,會自動回撥 _read 方法,把資料來源的資料放到緩衝池中,如果此時快取池中不存在資料則會直接吧資料傳遞給 data 事件,不會經過快取池;直到流動模式切換至其他暫停模式,或者資料來源的資料被讀取完了( push(null) );

可讀流可以通過以下方式切換回暫停模式:

  • 如果沒有管道目標,則呼叫 stream.pause()
  • 如果有管道目標,則移除所有管道目標。呼叫 stream.unpipe() 可以移除多個管道目標。
const { Readable } = require('stream')

let count = 1000
const myReadable = new Readable({
    highWaterMark: 300,
    read(size) {
        let chunk = null
        setTimeout(() => {
            if (count > 0) {
                let chunkLength = Math.min(count, size)
                chunk = '1'.repeat(chunkLength)
                count -= chunkLength
            }
            this.push(chunk)
        }, 500)
    }
})

myReadable.on('data', data => {
    console.log(data.toString())
})

Writable

相對可讀流來說,可寫流要簡單一些。

3.png

當生產者呼叫 write(chunk) 時,內部會根據一些狀態(corked,writing等)選擇是否快取到緩衝佇列中或者呼叫 _write,每次寫完資料後,會嘗試清空快取佇列中的資料。如果緩衝佇列中的資料大小超出了浮標值(highWaterMark),消費者呼叫 write(chunk) 後會返回 false,這時候生產者應該停止繼續寫入。

那麼什麼時候可以繼續寫入呢?當緩衝中的資料都被成功 _write 之後,清空了緩衝佇列後會觸發 drain 事件,這時候生產者可以繼續寫入資料。

當生產者需要結束寫入資料時,需要呼叫 stream.end 方法通知可寫流結束。

const { Writable, Duplex } = require('stream')
let fileContent = ''
const myWritable = new Writable({
    highWaterMark: 10,
    write(chunk, encoding, callback) {// 會作為_write方法
        setTimeout(()=>{
            fileContent += chunk
            callback()// 寫入結束後呼叫
        }, 500)
    }
})

myWritable.on('close', ()=>{
    console.log('close', fileContent)
})
myWritable.write('123123')// true
myWritable.write('123123')// false
myWritable.end()

注意,在快取池中資料到達浮標值後,此時快取池中可能存在多個節點,在清空快取池的過程中(迴圈呼叫_read),並不會向可讀流一樣儘量一次消費長度為浮標值的資料,而是每次消費一個緩衝區節點,即使這個緩衝區長度於浮標值不一致也是如此

const { Writable } = require('stream')


let fileContent = ''
const myWritable = new Writable({
    highWaterMark: 10,
    write(chunk, encoding, callback) {
        setTimeout(()=>{
            fileContent += chunk
            console.log('消費', chunk.toString())
            callback()// 寫入結束後呼叫
        }, 100)
    }
})

myWritable.on('close', ()=>{
    console.log('close', fileContent)
})

let count = 0
function productionData(){
    let flag = true
    while (count <= 20 && flag){
        flag = myWritable.write(count.toString())
        count++
    }
    if(count > 20){
        myWritable.end()
    }
}
productionData()
myWritable.on('drain', productionData)

上述是一個浮標值為 10 的可寫流,現在資料來源是一個 0——20 到連續的數位字串,productionData 用於寫入資料。

  • 首先第一次呼叫 myWritable.write("0") 時,因為快取池不存在資料,所以 "0" 不進入快取池,而是直接交給 _wirtemyWritable.write("0") 返回值為 true

  • 當執行 myWritable.write("1") 時,因為 _wirtecallback 還未呼叫,表明上一次資料還未寫入完,位置保證資料寫入的有序性,只能建立一個緩衝區將 "1" 加入快取池中。後面 2-9 都是如此

  • 當執行 myWritable.write("10") 時,此時緩衝區長度為 9(1-9),還未到達浮標值, "10" 繼續作為一個緩衝區加入快取池中,此時快取池長度變為 11,所以 myWritable.write("1") 返回 false,這意味著緩衝區的資料已經足夠,我們需要等待 drain 事件通知時再生產資料。

  • 100ms過後,_write("0", encoding, callback)callback 被呼叫,表明 "0" 已經寫入完成。然後會檢查快取池中是否存在資料,如果存在則會先呼叫 _read 消費快取池的頭節點("1"),然後繼續重複這個過程直到快取池為空後觸發 drain 事件,再次執行 productionData

  • 呼叫 myWritable.write("11"),觸發第1步開始的過程,直到流結束。

Duplex

在理解了可讀流與可寫流後,雙工流就好理解了,雙工流事實上是繼承了可讀流然後實現了可寫流(原始碼是這麼寫的,但是應該說是同時實現了可讀流和可寫流更加好)。

4.png

Duplex 流需要同時實現下面兩個方法

  • 實現 _read() 方法,為可讀流生產資料

  • 實現 _write() 方法,為可寫流消費資料

上面兩個方法如何實現在上面可寫流可讀流的部分已經介紹過了,這裡需要注意的是,雙工流是存在兩個獨立的快取池分別提供給兩個流,他們的資料來源也不一樣

以 NodeJs 的標準輸入輸出流為例:

  • 當我們在控制檯輸入資料時會觸發其 data 事件,這證明他有可讀流的功能,每一次使用者鍵入回車相當於呼叫可讀的 push 方法推播生產的資料。
  • 當我們呼叫其 write 方法時也可以向控制檯輸出內容,但是不會觸發 data 事件,這說明他有可寫流的功能,而且有獨立的緩衝區,_write 方法的實現內容就是讓控制檯展示文字。
// 每當使用者在控制檯輸入資料(_read),就會觸發data事件,這是可讀流的特性
process.stdin.on('data', data=>{
    process.stdin.write(data);
})

// 每隔一秒向標準輸入流生產資料(這是可寫流的特性,會直接輸出到控制檯上),不會觸發data
setInterval(()=>{
    process.stdin.write('不是使用者控制檯輸入的資料')
}, 1000)

Transform

5.png

可以將 Duplex 流視為具有可寫流的可讀流。兩者都是獨立的,每個都有獨立的內部緩衝區。讀寫事件獨立發生。

                             Duplex Stream
                          ------------------|
                    Read  <-----               External Source
            You           ------------------|  
                    Write ----->               External Sink
                          ------------------|

Transform 流是雙工的,其中讀寫以因果關係進行。雙工流的端點通過某種轉換連結。讀取要求發生寫入。

                                 Transform Stream
                           --------------|--------------
            You     Write  ---->                   ---->  Read  You
                           --------------|--------------

對於建立 Transform 流,最重要的是要實現 _transform 方法而不是 _write 或者 _read_transform 中對可寫流寫入的資料做處理(消費)然後為可讀流生產資料。

轉換流還經常會實現一個 `_flush` 方法,他會在流結束前被呼叫,一般用於對流的末尾追加一些東西,例如壓縮檔案時的一些壓縮資訊就是在這裡加上的
const { write } = require('fs')
const { Transform, PassThrough } = require('stream')

const reurce = '1312123213124341234213423428354816273513461891468186499126412'

const transform = new Transform({
    highWaterMark: 10,
    transform(chunk ,encoding, callback){// 轉換資料,呼叫push將轉換結果加入快取池
        this.push(chunk.toString().replace('1', '@'))
        callback()
    },
    flush(callback){// end觸發前執行
        this.push('<<<')
        callback()
    }
})


// write 不斷寫入資料
let count = 0
transform.write('>>>')
function productionData() {
    let flag = true
    while (count <= 20 && flag) {
        flag = transform.write(count.toString())
        count++
    }
    if (count > 20) {
        transform.end()
    }
}
productionData()
transform.on('drain', productionData)


let result = ''
transform.on('data', data=>{
    result += data.toString()
})
transform.on('end', ()=>{
    console.log(result)
    // >>>0@23456789@0@1@2@3@4@5@6@7@8@920<<<
})

更多node相關知識,請存取:!

以上就是node中的stream(流)有幾種型別的詳細內容,更多請關注TW511.COM其它相關文章!