2017-03-07 32 views
0

是否可以從fs.readStream處理數據流中的大塊數據?示例:讀取一個文件並計算一個字符出現的次數,而不將整個文件存儲在內存中。簡單地解析流中的每個塊並彙總一個數字..我嘗試創建一個函數傳遞給.pipe(),但是fn需要定義.on()並且失敗。是否有可能從fs.readStream攔截和定製過程流塊?

我在流媒體上有點初學者。我做了一堆谷歌搜索,閱讀,試驗,但沒有任何幫助。我也無法找到有關管道工作方式的信息,或者是否可以製作自己的定製管道接收器功能。

感謝

+0

你看看https://github.com/substack/stream-handbook? –

+0

我沒有 - 我現在要感謝 – KMongiello

回答

1

的easies的事情你可以做的是剛剛訂閱的「數據」事件,並簡單地計算的出現是這樣的:

'use strict'; 
 

 
const fs = require('fs'); 
 

 
const countCharsInString = function (st, char) { 
 
\t //Use a regex to calculate this 
 
\t let regex = new RegExp(char, 'g'); 
 
\t return (st.match(regex) || []).length 
 
}; 
 

 
const CHAR_TO_COUNT = '1'; 
 
let total_count = 0; 
 

 
let fileStream = fs.createReadStream('./log.txt'); 
 

 
//We'll calculate the occurrences as the stream emits data event 
 
//As we don't keep any references to 'chunk' and we don't collect the data anywhere 
 
//The garbage collector will clean the memory and we'll not run out of the RAM. 
 
fileStream.on('data', chunk => { 
 
\t let string = chunk.toString(); 
 
\t total_count += countCharsInString(string, CHAR_TO_COUNT); 
 
}); 
 
fileStream.on('error', err => console.error(err.stack)); 
 
//Log the count when everything is done. 
 
fileStream.on('end',() => console.log(`All done. There are ${total_count} occurrences of character '${CHAR_TO_COUNT}'.`));

但是如果你想要做的通過數據流進行處理,即時彙總數據,然後將數據傳輸到其他地方,以下是您可以執行的操作:

'use strict'; 
 

 
const fs = require('fs'); 
 
const Transform = require('stream').Transform; 
 

 
//We inherit from the Transform stream class 
 
class OcurrenceCountingStream extends Transform { 
 
\t constructor(options) { 
 
\t \t super(options); 
 

 
\t \t //Allowing to pass an option here 
 
\t \t this.charToCount = options.charToCount; 
 
\t \t this.totalCount = 0; 
 
\t } 
 

 
\t //This is now a static method as it's a pure function 
 
\t //That does not depend on the object state 
 
\t static countCharsInString(st, char) { 
 
\t \t //Use a regex to calculate this 
 
\t \t let regex = new RegExp(char, 'g'); 
 
\t \t return (st.match(regex) || []).length 
 
\t } 
 

 
\t //We should implement _transform function 
 
\t //in order to make all piping mechanisms working 
 
\t _transform(chunk, encoding, callback) { 
 
\t \t //Get our string, process and update totalCount 
 
\t \t let string = chunk.toString(); 
 
\t \t this.totalCount += OcurrenceCountingStream.countCharsInString(string, this.charToCount); 
 

 
\t \t //Pass the data further 
 
\t \t callback(null, chunk); 
 
\t } 
 
} 
 

 
let fileStream = fs.createReadStream('./log.txt'); 
 
let countingStream = new OcurrenceCountingStream({charToCount: '1'}); 
 

 
//So now we can pipe 
 
fileStream.pipe(countingStream); 
 
/* 
 
Here is a little moment. 
 
The stream should be in a flowing mode. This means that is started reading the data 
 
From the writable that was piped to it and will keep reading until the writer is ended 
 
So nothing basically happens if we just pipe it like this fileStream.pipe(countingStream); 
 
There are some ways to make countingStream flowing: 
 
1) Subscribe on its 'data' event. 
 
2) Pipe it somewhere 
 
3) Call .resume() is we don't really care of the data that's comming out. 
 
*/ 
 
countingStream.resume(); 
 

 
countingStream.on('error', err => console.error(err.stack)); 
 
countingStream.on('end',() => console.log(`All done. There are ${countingStream.totalCount} occurrences of character '${countingStream.charToCount}'.`));

相關問題