我有節點應用程序收集投票提交併將它們存儲在卡桑德拉。選票存儲爲base64編碼的加密字符串。該API有一個名爲/export
的端點,它應該獲得所有這些投票字符串(可能> 100萬),將它們轉換爲二進制,並將它們一個接一個地追加到votes.egd文件中。該文件應該被壓縮併發送到客戶端。我的想法是流從卡桑德拉行,轉換每個投票字符串爲二進制,並寫入WriteStream
。 我想將此功能包裝在Promise中以便於使用。我有以下內容:從卡桑德拉流數據到文件考慮背壓
streamVotesToFile(query, validVotesFileBasename) {
return new Promise((resolve, reject) => {
const writeStream = fs.createWriteStream(`${validVotesFileBasename}.egd`);
writeStream.on('error', (err) => {
logger.error(`Writestream ${validVotesFileBasename}.egd error`);
reject(err);
});
writeStream.on('drain',() => {
logger.info(`Writestream ${validVotesFileBasename}.egd error`);
})
db.client.stream(query)
.on('readable', function() {
let row = this.read();
while (row) {
const envelope = new Buffer(row.vote, 'base64');
if(!writeStream.write(envelope + '\n')) {
logger.error(`Couldn't write vote`);
}
row = this.read()
}
})
.on('end',() => { // No more rows from Cassandra
writeStream.end();
writeStream.on('finish',() => {
logger.info(`Stream done writing`);
resolve();
});
})
.on('error', (err) => { // err is a response error from Cassandra
reject(err);
});
});
}
當我運行它時,它將所有選票附加到文件並下載正常。但也有一堆的問題/問題,我有:
如果我做一個REQ到
/export
端點和這個函數運行,而它的運行的所有其他請求的應用是非常緩慢或只是不在導出請求完成之前完成。我猜是因爲事件循環被Cassandra流中的所有事件佔用(每秒數千次)?所有的選票似乎寫入文件罰款,但我幾乎每個
writeStream.write()
致電false
並看到相應的日誌消息(見代碼)?我知道我需要考慮backpressure和WritableStream的'drain'事件,所以理想情況下我會使用
pipe()
並將選票傳遞給文件,因爲它已經構建了背壓支持(對吧?),但由於我需要處理每一行(轉換爲二進制文件,並可能在將來添加其他行字段的其他數據),我將如何使用管道來做到這一點?
有道理。但是,如果我使用你的代碼^^我得到一個錯誤:TypeError:無效的非字符串/緩衝區塊在validchunk(_stream_writable.js:216:10)在Transform.Writable.write(_stream_writable.js:245:12)在ResultStream ResultStream.Readable.read(_stream_readable.js:381:10)處的ResultStream.emit(events.js:188:7)處的emitOne(events.js:96:13)處的.ondata(_stream_readable.js:555:20) – gcosta
Ahh,myTransform需要是objectMode中的Transform流,因爲客戶端流將返回ObjectMode中的Readable Streams2對象。 (將objectMode設置爲true)。謝謝! – gcosta