2015-10-14 127 views
7

我需要構建一個處理大型CSV文件以用於bluebird.map()調用的函數。鑑於文件的可能大小,我想使用流式傳輸。NodeJS,promise,streams - 處理大型CSV文件

該函數應該接受一個流(一個CSV文件)和一個函數(處理來自流的塊),並在文件讀取結束(解析)或錯誤(拒絕)時返回一個承諾。

於是,我開始:

'use strict'; 

var _ = require('lodash'); 
var promise = require('bluebird'); 
var csv = require('csv'); 
var stream = require('stream'); 

var pgp = require('pg-promise')({promiseLib: promise}); 

api.parsers.processCsvStream = function(passedStream, processor) { 

    var parser = csv.parse(passedStream, {trim: true}); 
    passedStream.pipe(parser); 

    // use readable or data event? 
    parser.on('readable', function() { 
    // call processor, which may be async 
    // how do I throttle the amount of promises generated 
    }); 

    var db = pgp(api.config.mailroom.fileMakerDbConfig); 

    return new Promise(function(resolve, reject) { 
    parser.on('end', resolve); 
    parser.on('error', reject); 
    }); 

} 

現在,我有兩個相互關聯的問題:

  1. 我需要節制的實際數據量進行處理,從而不會產生內存壓力。
  2. 作爲processor參數傳遞的函數通常是異步的,例如通過基於promise的庫(現在:pg-promise)將文件的內容保存到數據庫。因此,它會在記憶中創造一個承諾並繼續前進。

pg-promise圖書館功能來管理這一點,像page(),但我不能換我圍​​繞如何流事件處理程序與這些承諾搭配方式前進。現在,我在readable部分的每個read()之後返回承諾,這意味着我創建了大量承諾的數據庫操作,並最終出錯,因爲我遇到了進程內存限制。

有沒有人有這樣的工作示例,我可以用作跳點?

UPDATE:可能對皮膚貓不止一種方法,但這個工程:

'use strict'; 

var _ = require('lodash'); 
var promise = require('bluebird'); 
var csv = require('csv'); 
var stream = require('stream'); 

var pgp = require('pg-promise')({promiseLib: promise}); 

api.parsers.processCsvStream = function(passedStream, processor) { 

    // some checks trimmed out for example 

    var db = pgp(api.config.mailroom.fileMakerDbConfig); 
    var parser = csv.parse(passedStream, {trim: true}); 
    passedStream.pipe(parser); 

    var readDataFromStream = function(index, data, delay) { 
    var records = []; 
    var record; 
    do { 
     record = parser.read(); 
     if(record != null) 
     records.push(record); 
    } while(record != null && (records.length < api.config.mailroom.fileParserConcurrency)) 
    parser.pause(); 

    if(records.length) 
     return records; 
    }; 

    var processData = function(index, data, delay) { 
    console.log('processData(' + index + ') > data: ', data); 
    parser.resume(); 
    }; 

    parser.on('readable', function() { 
    db.task(function(tsk) { 
     this.page(readDataFromStream, processData); 
    }); 
    }); 

    return new Promise(function(resolve, reject) { 
    parser.on('end', resolve); 
    parser.on('error', reject); 
    }); 
} 

有人看到一個潛在的問題,這種做法?

+0

看起來很整齊,如果這個工作,那麼很棒的工作!我很高興最近在'pg-promise'中加入'page'並不是徒勞的;) –

+0

在readDataFromStream的末尾簡化了它;)你不需要'return undefined',這就是發生了什麼當你什麼也沒有返回時); –

+0

實際上,這可能是一個問題......當你調用db.task時,喲不會處理它的結果,所以如果它拒絕,將會有一個錯誤諾言庫,你的拒絕沒有處理。 –

回答

3

查找下面的正確執行相同類型的任務的完整應用程序:它將文件讀取爲流,將其解析爲CSV並將每行插入到數據庫中。

const fs = require('fs'); 
const promise = require('bluebird'); 
const csv = require('csv-parse'); 
const pgp = require('pg-promise')({promiseLib: promise}); 

const cn = "postgres://postgres:[email protected]:5432/test_db"; 
const rs = fs.createReadStream('primes.csv'); 

const db = pgp(cn); 

function receiver(_, data) { 
    function source(index) { 
     if (index < data.length) { 
      // here we insert just the first column value that contains a prime number; 
      return this.none('insert into primes values($1)', data[index][0]); 
     } 
    } 

    return this.sequence(source); 
} 

db.task(t => { 
    return pgp.spex.stream.read.call(t, rs.pipe(csv()), receiver); 
}) 
    .then(data => { 
     console.log('DATA:', data); 
    } 
    .catch(error => { 
     console.log('ERROR:', error); 
    }); 

注意,我改變的唯一事情:利用圖書館csv-parse代替csv,作爲一個更好的選擇。

spex庫中增加使用方法stream.read,它正確地服務於Readable流以用於承諾。

+0

在查詢(「INSERT ...」)完成後,不會嘗試從'parser'讀取下一個項目,無論如何下一個項目是否已經可讀?或者'parser.read()'返回一個promise? – Bergi

+0

另外,OP正在尋找諾言返回的'處理器'回調函數是怎麼回事? – Bergi

+0

@Bergi我的理解是,parser.read()是同步的,它顯示的方式。如果事實證明不是這樣,那麼很明顯,它需要被包裹在承諾中。 「可讀」會被觸發一次,而不是每次讀取操作,這是我的理解。至於承諾返還處理器,他只是在數據處理完成時尋找一個解決方案,而在失敗的情況下則拒絕一個拒絕,我的示例提供了這種解決方案,即任務將相應地解決/拒絕。 –

1

那麼說,你不想流式傳輸,但某種數據塊? ;-)

你知道嗎https://github.com/substack/stream-handbook

我認爲最簡單的方法不改變你的架構會是某種承諾池。例如https://github.com/timdp/es6-promise-pool

+0

那麼,我曾想過在函數中使用'async.queue',並返回最終完成文件的承諾(或不)。但是,我想知道如何將Bluebird等承諾庫與典型的基於流的大型文件處理聯繫起來。 ('pg-promise'包含'spex',它提供了更高級別的promise函數) – alphadogg

6

你可能想看看promise-streams

var ps = require('promise-streams'); 
passedStream 
    .pipe(csv.parse({trim: true})) 
    .pipe(ps.map({concurrent: 4}, row => processRowDataWhichMightBeAsyncAndReturnPromise(row))) 
    .wait().then(_ => { 
    console.log("All done!"); 
    }); 

工程與背壓和一切。