我試圖寫一個非常實用的方式。我們使用Highland.js來管理流處理,但是因爲我太新了,我想我對如何處理這種獨特的情況感到困惑。Highland.js CSV解析
這裏的問題是文件流中的所有數據都不一致。文件中的第一行通常是頭,我們希望將其存儲到內存中,然後壓縮流中的所有行。
這是我第一次去吧:
var _ = require('highland');
var fs = require('fs');
var stream = fs.createReadStream('./data/gigfile.txt');
var output = fs.createWriteStream('output.txt');
var headers = [];
var through = _.pipeline(
_.split(),
_.head(),
_.doto(function(col) {
headers = col.split(',');
return headers;
}),
......
_.splitBy(','),
_.zip(headers),
_.wrapCallback(process)
);
_(stream)
.pipe(through)
.pipe(output);
在管道中的第一個命令是由行文件分割。下一個抓取標題,doto聲明它是一個全局變量。問題是流中接下來的幾行不存在,所以進程被阻塞......可能是因爲它上面的head()命令。
我試過其他一些變化,但我覺得這個例子給你一個我需要去的地方的感覺。
有關這方面的任何指導都會有所幫助 - 它也提出了我的每行中是否有不同值的問題,我如何在多種不同長度/複雜度的流操作之間分割流程流。
謝謝。
編輯:我產生了一個更好的結果,但我質疑它的效率 - 有沒有一種方法我可以優化這個,所以在每次運行我不檢查頭是否被記錄?這仍然感覺不穩定。
var through = _.pipeline(
_.split(),
_.filter(function(row) {
// Filter out bogus values
if (! row || headers) {
return true;
}
headers = row.split(',');
return false;
}),
_.map(function(row) {
return row.split(',')
}),
_.batch(500),
_.compact(),
_.map(function(row) {
return JSON.stringify(row) + "\n";
})
);
_(stream)
.pipe(through)