2017-03-03 36 views
1

我有節點應用程序收集投票提交併將它們存儲在卡桑德拉。選票存儲爲base64編碼的加密字符串。該API有一個名爲/export的端點,它應該獲得所有這些投票字符串(可能> 100萬),將它們轉換爲二進制,並將它們一個接一個地追加到votes.egd文件中。該文件應該被壓縮併發送到客戶端。我的想法是流從卡桑德拉行,轉換每個投票字符串爲二進制,並寫入WriteStream。 我想將此功能包裝在Promise中以便於使用。我有以下內容:從卡桑德拉流數據到文件考慮背壓

streamVotesToFile(query, validVotesFileBasename) { 
    return new Promise((resolve, reject) => { 
    const writeStream = fs.createWriteStream(`${validVotesFileBasename}.egd`); 

    writeStream.on('error', (err) => { 
     logger.error(`Writestream ${validVotesFileBasename}.egd error`); 
     reject(err); 
    }); 

    writeStream.on('drain',() => { 
     logger.info(`Writestream ${validVotesFileBasename}.egd error`); 
    }) 

    db.client.stream(query) 
    .on('readable', function() { 
     let row = this.read(); 
     while (row) { 
     const envelope = new Buffer(row.vote, 'base64'); 
     if(!writeStream.write(envelope + '\n')) { 
      logger.error(`Couldn't write vote`); 
     } 
     row = this.read() 
     } 
    }) 
    .on('end',() => { // No more rows from Cassandra 
     writeStream.end(); 
     writeStream.on('finish',() => { 
     logger.info(`Stream done writing`); 
     resolve(); 
     }); 
    }) 
    .on('error', (err) => { // err is a response error from Cassandra 
     reject(err); 
    }); 
    }); 
} 

當我運行它時,它將所有選票附加到文件並下載正常。但也有一堆的問題/問題,我有:

  1. 如果我做一個REQ到/export端點和這個函數運行,而它的運行的所有其他請求的應用是非常緩慢或只是不在導出請求完成之前完成。我猜是因爲事件循環被Cassandra流中的所有事件佔用(每秒數千次)?

  2. 所有的選票似乎寫入文件罰款,但我幾乎每個writeStream.write()致電false並看到相應的日誌消息(見代碼)?

  3. 我知道我需要考慮backpressure和WritableStream的'drain'事件,所以理想情況下我會使用pipe()並將選票傳遞給文件,因爲它已經構建了背壓支持(對吧?),但由於我需要處理每一行(轉換爲二進制文件,並可能在將來添加其他行字段的其他數據),我將如何使用管道來做到這一點?

回答

0

這個完美的使用案例爲TransformStream

const myTransform = new Transform({ 
    readableObjectMode: true, 
    transform(row, encoding, callback) { 
    // Transform the row into something else 
    const item = new Buffer(row['vote'], 'base64'); 
    callback(null, item); 
    } 
}); 

client.stream(query, params, { prepare: true }) 
    .pipe(myTransform) 
    .pipe(fileStream); 

見就如何落實在Node.js API Docs一個TransformStream更多信息。

+0

有道理。但是,如果我使用你的代碼^^我得到一個錯誤:TypeError:無效的非字符串/緩衝區塊在validchunk(_stream_writable.js:216:10)在Transform.Writable.write(_stream_writable.js:245:12)在ResultStream ResultStream.Readable.read(_stream_readable.js:381:10)處的ResultStream.emit(events.js:188:7)處的emitOne(events.js:96:13)處的.ondata(_stream_readable.js:555:20) – gcosta

+1

Ahh,myTransform需要是objectMode中的Transform流,因爲客戶端流將返回ObjectMode中的Readable Streams2對象。 (將objectMode設置爲true)。謝謝! – gcosta