2015-04-01 166 views
3

我試圖寫一個非常實用的方式。我們使用Highland.js來管理流處理,但是因爲我太新了,我想我對如何處理這種獨特的情況感到困惑。Highland.js CSV解析

這裏的問題是文件流中的所有數據都不一致。文件中的第一行通常是頭,我們希望將其存儲到內存中,然後壓縮流中的所有行。

這是我第一次去吧:

var _  = require('highland'); 
var fs  = require('fs'); 
var stream = fs.createReadStream('./data/gigfile.txt'); 
var output = fs.createWriteStream('output.txt'); 

var headers = []; 

var through = _.pipeline(
    _.split(), 
    _.head(), 
    _.doto(function(col) { 
     headers = col.split(','); 
     return headers; 
    }), 

    ...... 

    _.splitBy(','), 
    _.zip(headers), 
    _.wrapCallback(process) 
); 

_(stream) 
    .pipe(through) 
    .pipe(output); 

在管道中的第一個命令是由行文件分割。下一個抓取標題,doto聲明它是一個全局變量。問題是流中接下來的幾行不存在,所以進程被阻塞......可能是因爲它上面的head()命令。

我試過其他一些變化,但我覺得這個例子給你一個我需要去的地方的感覺。

有關這方面的任何指導都會有所幫助 - 它也提出了我的每行中是否有不同值的問題,我如何在多種不同長度/複雜度的流操作之間分割流程流。

謝謝。

編輯:我產生了一個更好的結果,但我質疑它的效率 - 有沒有一種方法我可以優化這個,所以在每次運行我不檢查頭是否被記錄?這仍然感覺不穩定。

var through = _.pipeline(
    _.split(), 
    _.filter(function(row) { 
     // Filter out bogus values 
     if (! row || headers) { 
      return true; 
     } 
     headers = row.split(','); 
     return false; 
    }), 
    _.map(function(row) { 
     return row.split(',') 
    }), 
    _.batch(500), 
    _.compact(), 
    _.map(function(row) { 
     return JSON.stringify(row) + "\n"; 
    }) 
); 

_(stream) 
    .pipe(through) 

回答

3

您可以使用Stream.observe()Stream.fork()分裂流。

var _  = require('highland'); 
var fs  = require('fs'); 
var stream = fs.createReadStream('./data/gigfile.txt'); 
var output = fs.createWriteStream('output.txt'); 
var through = highland.pipeline(function(s) { 
    var headerStream, headers; 
    // setup a shared variable to store the headers 
    headers = []; 
    // setup the csv processing 
    s = s 
     // split input into lines 
     .split() 
     // remove empty lines 
     .compact() 
     // split lines into arrays 
     .map(function(row) { 
      return row.split(','); 
     }); 
    // create a new stream to grab the header 
    headerStream = s.observe(); 
    // pause the original stream 
    s.pause(); 
    // setup processing of the non-header rows 
    s = s 
     // drop the header row 
     .drop(1) 
     // convert the rest of the rows to objects 
     .map(function(row) { 
      var obj = headers.reduce(function(obj, key, i) { 
       obj[key] = row[i]; 
       return obj; 
      }, {}); 
      return JSON.stringify(obj) + "\n"; 
     }); 
    // grab the first row from the header stream 
    // save the headers and then resume the normal stream 
    headerStream.head().toArray(function(rows) { 
     headers = rows[0]; 
     s.resume(); 
    }); 
    return s; 
}); 
_(stream) 
    .pipe(through) 
    .pipe(output); 

也就是說,您的csv解析不會在您的值中轉義換行符和逗號。通常情況下,這通過在雙引號中包裝值來在csv文件中完成。然後雙引號通過將兩個相鄰放在一起來逃脫。這樣做有點棘手,所以我建議使用一個處理它的包,比如fast-csv

那麼你的代碼看起來是這樣的:

var _  = require('highland'); 
var fs  = require('fs'); 
var csv = require('fast-csv'); 
var stream = fs.createReadStream('./data/gigfile.txt'); 
var output = fs.createWriteStream('output.txt'); 

_(stream.pipe(csv({headers: true, ignoreEmpty: true}))) 
    .map(function(row) { 
     return JSON.stringify(row) + "\n"; 
    }) 
    .pipe(output);