2014-10-03 50 views
2

我有一個EventEmitter對象,我設置爲收聽事件。發生事件時,我想將信息寫入文件。我有一個開放的FileStream通過fs.createWriteStream(path, { flags: 'a'});目前,我的問題是,如果我發出事件超快,經常,我開始得到「備份」。 IE .write返回false要求我停止寫一會兒。由於我在事件處理程序中進行寫操作,因此附近沒有用於指示寫入過程結束的回調函數。我可以從處理或排放方面做什麼來防止備份?從事件處理程序寫入WriteStream

最終,它似乎並不重要;所有的數據都被寫入文件。但我希望儘可能遵循「規則」。

我知道我可以聽drain事件並在此後再次開始寫,但是如何防止其他事件進入處理程序?我注意到,如果我在每次發射之前放置了50毫秒的延遲,備份似乎不會發生,但似乎有點像黑客。另外如果你有一個較慢的硬盤?

下面是我的情況爲例:

var ee = new EventEmitter(); 
var stream = fs.createWriteStream('./file/log.txt', { flags:'a'}); 

ee.on('report', function (i) { 
    stream.write('new file data ' + i + ' --- ' + Date.now + '\n'); 
}); 

for (var i = 0; i < 10000; ++i) { 
    ee.emit('report', i) 
} 

這不是確切的代碼,但是這是它的要點。完整的代碼發生在從正在運行的HTTP服務器發送響應時,但如果我像1000個請求一樣排隊,通過for循環,我就會遇到上述情況。

回答

1

我實際上最終找到了一個更簡單的解決方案來解決這個問題,使用讀寫流。請參閱下面的代碼示例

var stream = require('stream'); 
var fs = require('fs'); 
var EventEmitter = require('events').EventEmitter; 

var ee = new EventEmitter(); 
var writeStream = fs.createWriteStream('./file/log.txt', { flags: 'a', end: false }); 
var readStream = new stream.Readable(); 
// This needs to be here for compatibility reasons, but is intentionally a no-op 
readStream._read = function() {}; 

ee.on('report', function (i) { 
    readStream.push(i.toString()); 
}); 

readStream.pipe(writeStream); 

for (var i = 0; i < 10000; ++i) { 
    ee.emit('report', i); 
} 

這將允許Node管道和流系統處理與OS協調的背壓。這是IMO這個問題的首選方法。

+0

您應該重命名您的變量以匹配您的函數調用。 Ex readStream實際上是一個writeStream,因爲這是您創建的內容,並且您推送到可讀流而不是writeStream,然後從可讀流傳輸到可寫流。如果你修復它,我會刪除我的投票並用一個投票取代它。 – esnm 2015-04-12 14:08:30

+0

我不認爲真的需要downvote,但代碼存在實際問題,所以我解決了這個問題。 – arb 2015-04-13 13:28:23

+0

Upvoted它。你的命名混亂和誤導。現在看起來好多了。謝謝。 – esnm 2015-04-14 15:00:34

0

處理這個問題的理想方法是傳入事件,如果事件來自流,或者以某種方式暫停,您可以執行此操作,但這並非總是可行。

如果您無法以某種方式暫停傳入的事件,那麼我通常使用async模塊的queue函數來處理此事件。當然,還有很多其他方法可以做到這一點,但使用隊列是我找到的最簡單的方法,並且async模塊(適用於大量異步操作)非常好。

其基本思想是將所有write調用放入配置爲一次只能處理1個任務的隊列中。如果您從stream.write撥打false回電,那麼您pause()queue。一旦你從stream得到drain事件,你再次排隊resume()。這樣你就不會在stream飽和時寫信給你,但你仍然可以接收事件並在stream準備好時排隊。

這樣做,以你的示例代碼會是這個樣子:

var async = require('async'); 

var ee = new EventEmitter(); 
var stream = fs.createWriteStream('./file/log.txt', { flags:'a'}); 

// Create a queue with a concurrency of 1 
var writeQueue = async.queue(function(data, callback) { 
    if (!stream.write(data)) { 
     // if write() returns false, it's saturated; pause the queue 
     writeQueue.pause(); 
    } 
    callback(); 
}, 1); // <-- concurrency argument here; it's easy to miss ;) 

stream.on('drain', function() { 
    // the stream isn't saturated anymore; resume the queue 
    writeQueue.resume(); 
}) 

ee.on('report', function (i) { 
    // instead of writing directly to the stream, push data to the writeQueue 
    writeQueue.push('new file data ' + i + ' --- ' + Date.now() + '\n'); 
}); 

for (var i = 0; i < 10000; ++i) { 
    ee.emit('report', i) 
} 

注:這是不是真的從只是讓內部流緩衝區的東西不同。你仍然在緩衝數據,你只是自己做,並且讓你更好地控制情況。