2017-01-14 63 views
2

我剛創建了一個簡單的Readable和Writable流對,它可以通過pipe()連接。我有興趣創建背壓並控制Readable中讀取的速率。不過,我對如何實際實現這一點或者是否有可能使用Node.js流感到困惑。 作爲一個例子:在Node.js流中一路實現背壓

const {Writable, Readable} = require('stream'); 

    function getWritable() { 
     return new Writable({ 
      write: function (chunk, encoding, cb) { 
       console.log(' => chunk => ', String(chunk)); 
       setTimeout(cb, 1500); 
      } 
     }); 
    } 


    function getReadable(data) { 
     return new Readable({ 
      encoding: 'utf8', 
      objectMode: false, 
      read: function (n) { 
       // n => Number(16384) 
       console.log('read is called'); 
       const d = data.shift(); 
       this.push(d ? String(d) : null); 
      } 
     }); 
    } 


    const readableStrm = getReadable([1, 2, 3, 4, 5]); 

    const piped = readableStrm.pipe(getWritable()); 

    piped.on('finish', function() { 
     console.log('finish'); 
    }); 

,如果你運行上面的代碼,我們將看到「讀叫做」將得到記錄5次,可寫入的寫入方法看到數據之前很久。

我想要做的是在Writable中的寫入方法已經觸發其回調時,只在Readable中調用read();當然,read()方法必須先啓動一次,但隨後會等待寫入準備就緒。

有沒有辦法控制何時以可讀方式觸發read()方法?

最終,我真的不明白read()方法的目的是什麼。

作爲一個簡單的例子,無論我從read()返回什麼,我都無法讓它停止閱讀。閱讀方法的要點是什麼?爲什麼我們必須實現它?

const Readable = require('stream').Readable; 

const r = new Readable({ 
    objectMode: true, 
    read: function (n) { 
     console.log('is read'); 
     return false/null/true; // nothing I return here makes a difference 
    } 
}); 


r.on('data', function (d) { 
    console.log(d); 
}); 


setInterval(function(){ 
    r.push('valid'); 
},1000); 

回答

2

Node.js流功能非常強大,並提供了很多控制緩衝和數據流通過它們的緩衝。

現在回答你的問題:

A)你看到正在先讀所有數據,然後寫入被解僱,因爲你的數據流是非常小的。如果您使用千字節的數據進行了測試,您將看到按順序讀取流和寫入流。序列取決於讀流的緩衝能力和寫流創建的背壓。例如,TCP套接字讀取流將比磁盤文件寫入流快得多,從而產生背壓。

B)一個用於讀取&寫入流的強大構造函數選項是highWaterMark。您可以在Buffalo部分的documentation中瞭解更多信息。 highWaterMark明確定義了可讀/可寫流的緩衝容量。默認值是16kb。在你上面的例子中,你可以用highWaterMark設置你的可讀流爲2個字節,如下所示,你會看到不同的(在實際情況中不需要,但你可以用於學習)。的highWaterMark

function getReadable(data) { 
    let i = 0; 
    return new stream.Readable({ 
     highWaterWark: 2, // <--- highWaterMark set to 2 byte. Preferably set it to 10 and increase the length of your input array. 
     encoding: 'utf8', 
     objectMode: false, 
     read: function (n) { 
      // n => Number(16384) 
      console.log('read is called'); 
      const d = data[i++]; 
      this.push(d ? String(d) : null); 
     } 
    }); 
} 

較小的值會很快創建一個背壓,並可能有損於某些使用情況,如閱讀網絡數據。

C)您還可以控制讀取流和寫入流中的數據流。如果你的應用需要它,那麼你可以控制來自可寫流的可讀流。具體方法readable.pause(),readable.read([size]),readable.resume(), readable.push(chunk[, encoding])readable.unpipe([destination])允許您控制可讀流(甚至可寫入流)中的緩衝數據流&。實際上,您甚至可以使用方法readable.unshift(chunk)將數據從可寫入流推回到可讀流。 也有類似的方法來控制可寫入流中的數據

D)readwrite方法是您實現流的一部分。這些方法用於將流式數據發送到底層資源,不應直接從應用程序數據中調用。基本上,它定義了你的流的設置。 (不知道我是否能夠清楚地解釋這一點)。

我強烈建議您閱讀Node.js documentation on streams。它會給你很多的信息(比你在其他網站上可以找到的示例代碼更多)。

我希望以上信息對您有所幫助。

+0

感謝您的回答,我向上投票是因爲它提供了豐富的內容,但不幸的是,我不向我證明您說的是真實的。我正在尋找一些演示read()方法的代碼,以及read()中的返回值如何起作用。此外,我正在尋找可以異步獲取數據的read()的體面實現。 –

+0

我擔心讀取流的所有數據都會被緩存到內存中,因爲您在read()中使用this.push(X)...但X從哪裏來? :) –

+0

似乎讀取的返回值並不重要,並且可以異步調用this.push;但我認爲如果API採取了回調,而不是期待用戶調用this.push,它會更清晰。 –