我需要構建一個處理大型CSV文件以用於bluebird.map()調用的函數。鑑於文件的可能大小,我想使用流式傳輸。NodeJS,promise,streams - 處理大型CSV文件
該函數應該接受一個流(一個CSV文件)和一個函數(處理來自流的塊),並在文件讀取結束(解析)或錯誤(拒絕)時返回一個承諾。
於是,我開始:
'use strict';
var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');
var pgp = require('pg-promise')({promiseLib: promise});
api.parsers.processCsvStream = function(passedStream, processor) {
var parser = csv.parse(passedStream, {trim: true});
passedStream.pipe(parser);
// use readable or data event?
parser.on('readable', function() {
// call processor, which may be async
// how do I throttle the amount of promises generated
});
var db = pgp(api.config.mailroom.fileMakerDbConfig);
return new Promise(function(resolve, reject) {
parser.on('end', resolve);
parser.on('error', reject);
});
}
現在,我有兩個相互關聯的問題:
- 我需要節制的實際數據量進行處理,從而不會產生內存壓力。
- 作爲
processor
參數傳遞的函數通常是異步的,例如通過基於promise的庫(現在:pg-promise
)將文件的內容保存到數據庫。因此,它會在記憶中創造一個承諾並繼續前進。
的pg-promise
圖書館功能來管理這一點,像page(),但我不能換我圍繞如何流事件處理程序與這些承諾搭配方式前進。現在,我在readable
部分的每個read()
之後返回承諾,這意味着我創建了大量承諾的數據庫操作,並最終出錯,因爲我遇到了進程內存限制。
有沒有人有這樣的工作示例,我可以用作跳點?
UPDATE:可能對皮膚貓不止一種方法,但這個工程:
'use strict';
var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');
var pgp = require('pg-promise')({promiseLib: promise});
api.parsers.processCsvStream = function(passedStream, processor) {
// some checks trimmed out for example
var db = pgp(api.config.mailroom.fileMakerDbConfig);
var parser = csv.parse(passedStream, {trim: true});
passedStream.pipe(parser);
var readDataFromStream = function(index, data, delay) {
var records = [];
var record;
do {
record = parser.read();
if(record != null)
records.push(record);
} while(record != null && (records.length < api.config.mailroom.fileParserConcurrency))
parser.pause();
if(records.length)
return records;
};
var processData = function(index, data, delay) {
console.log('processData(' + index + ') > data: ', data);
parser.resume();
};
parser.on('readable', function() {
db.task(function(tsk) {
this.page(readDataFromStream, processData);
});
});
return new Promise(function(resolve, reject) {
parser.on('end', resolve);
parser.on('error', reject);
});
}
有人看到一個潛在的問題,這種做法?
看起來很整齊,如果這個工作,那麼很棒的工作!我很高興最近在'pg-promise'中加入'page'並不是徒勞的;) –
在readDataFromStream的末尾簡化了它;)你不需要'return undefined',這就是發生了什麼當你什麼也沒有返回時); –
實際上,這可能是一個問題......當你調用db.task時,喲不會處理它的結果,所以如果它拒絕,將會有一個錯誤諾言庫,你的拒絕沒有處理。 –