我剛創建了一個簡單的Readable和Writable流對,它可以通過pipe()連接。我有興趣創建背壓並控制Readable中讀取的速率。不過,我對如何實際實現這一點或者是否有可能使用Node.js流感到困惑。 作爲一個例子:在Node.js流中一路實現背壓
const {Writable, Readable} = require('stream');
function getWritable() {
return new Writable({
write: function (chunk, encoding, cb) {
console.log(' => chunk => ', String(chunk));
setTimeout(cb, 1500);
}
});
}
function getReadable(data) {
return new Readable({
encoding: 'utf8',
objectMode: false,
read: function (n) {
// n => Number(16384)
console.log('read is called');
const d = data.shift();
this.push(d ? String(d) : null);
}
});
}
const readableStrm = getReadable([1, 2, 3, 4, 5]);
const piped = readableStrm.pipe(getWritable());
piped.on('finish', function() {
console.log('finish');
});
,如果你運行上面的代碼,我們將看到「讀叫做」將得到記錄5次,可寫入的寫入方法看到數據之前很久。
我想要做的是在Writable中的寫入方法已經觸發其回調時,只在Readable中調用read()
;當然,read()方法必須先啓動一次,但隨後會等待寫入準備就緒。
有沒有辦法控制何時以可讀方式觸發read()
方法?
最終,我真的不明白read()
方法的目的是什麼。
作爲一個簡單的例子,無論我從read()返回什麼,我都無法讓它停止閱讀。閱讀方法的要點是什麼?爲什麼我們必須實現它?
const Readable = require('stream').Readable;
const r = new Readable({
objectMode: true,
read: function (n) {
console.log('is read');
return false/null/true; // nothing I return here makes a difference
}
});
r.on('data', function (d) {
console.log(d);
});
setInterval(function(){
r.push('valid');
},1000);
感謝您的回答,我向上投票是因爲它提供了豐富的內容,但不幸的是,我不向我證明您說的是真實的。我正在尋找一些演示read()方法的代碼,以及read()中的返回值如何起作用。此外,我正在尋找可以異步獲取數據的read()的體面實現。 –
我擔心讀取流的所有數據都會被緩存到內存中,因爲您在read()中使用this.push(X)...但X從哪裏來? :) –
似乎讀取的返回值並不重要,並且可以異步調用this.push;但我認爲如果API採取了回調,而不是期待用戶調用this.push,它會更清晰。 –