2012-06-27 83 views
10

我需要制定出實時讀取正在使用node.js寫入文件的數據的最佳方法。麻煩的是,Node是一種快速移動的船舶,它使尋找解決困難問題的最佳方法成爲可能。使用Node.js實時讀取文件

我想做
我有一個做的東西,然後寫的這個東西它到一個文本文件的結果的java程序。它通常需要5分鐘到5小時的任何時間來運行,數據一直寫入,並且可以達到一些相當高的吞吐率(大約1000行/秒)。

我想實時讀取此文件,然後使用節點聚合數據並將其寫入到可在客戶端上繪製圖形的套接字中。

客戶端,圖表,套接字和聚合邏輯都已完成,但我對閱讀文件的最佳方法感到困惑。

我曾嘗試(或至少與飾演)
FIFO - 我可以告訴我的Java程序來寫入FIFO和閱讀本使用節點,其實這是我們如何有這個目前使用Perl implemted ,但是因爲其他所有內容都在節點中運行,所以移植代碼很有意義。

Unix Sockets - 如上所述。

fs.watchFile - 這會工作,我們需要什麼?

fs.createReadStream - 這比WatchFile更好嗎?

fs & tail -f - 看起來像一個黑客。

什麼,其實是我的問題
我傾向於使用Unix套接字,這似乎是最快的選擇。但是節點是否具有更好的實時讀取fs文件的內置功能?

回答

6

如果你想保持文件作爲數據的持久化存儲,以防止流的損失在系統崩潰或成員之一的網絡中的情況下,正在運行的進程死亡後,您仍然可以繼續寫入文件並從中讀取數據。

如果您不需要此文件作爲您的Java進程生成結果的持久性存儲,那麼使用Unix套接字對於易用性和性能都要好得多。

fs.watchFile()不是你所需要的,因爲它在文件系統報告時對文件統計信息起作用,並且由於你想讀取正在寫入的文件,所以這不是你想要的。

短的更新:我很抱歉地認識到,雖然我曾指責fs.watchFile()在前面的段落使用文件統計,我做了我自己同樣的事情在我下面的示例代碼!儘管我已經告誡讀者「保重!」因爲我已經在幾分鐘內寫完了,甚至沒有經過很好的測試;但是,如果底層系統支持它,則可以通過使用fs.watch()而不是watchFilefstatSync來做得更好。

讀/從文件寫作,我剛纔寫下面的樂趣在我的休息:

測試-FS-writer.js:因爲你在Java編寫的文件你不會需要這個過程]

var fs = require('fs'), 
    lineno=0; 

var stream = fs.createWriteStream('test-read-write.txt', {flags:'a'}); 

stream.on('open', function() { 
    console.log('Stream opened, will start writing in 2 secs'); 
    setInterval(function() { stream.write((++lineno)+' oi!\n'); }, 2000); 
}); 

測試-FS-reader.js:小心,這只是演示,檢查ERR對象]

var fs = require('fs'), 
    bite_size = 256, 
    readbytes = 0, 
    file; 

fs.open('test-read-write.txt', 'r', function(err, fd) { file = fd; readsome(); }); 

function readsome() { 
    var stats = fs.fstatSync(file); // yes sometimes async does not make sense! 
    if(stats.size<readbytes+1) { 
     console.log('Hehe I am much faster than your writer..! I will sleep for a while, I deserve it!'); 
     setTimeout(readsome, 3000); 
    } 
    else { 
     fs.read(file, new Buffer(bite_size), 0, bite_size, readbytes, processsome); 
    } 
} 

function processsome(err, bytecount, buff) { 
    console.log('Read', bytecount, 'and will process it now.'); 

    // Here we will process our incoming data: 
     // Do whatever you need. Just be careful about not using beyond the bytecount in buff. 
     console.log(buff.toString('utf-8', 0, bytecount)); 

    // So we continue reading from where we left: 
    readbytes+=bytecount; 
    process.nextTick(readsome); 
} 

您可以安全避免直接使用nextTick並直接致電readsome()。由於我們仍然在這裏同步工作,所以在任何意義上都沒有必要。我只是喜歡它。 :P

EDIT通過Oliver Lloyd

以上面的例子,但它延伸到讀CSV數據給出:

var lastLineFeed, 
    lineArray; 
function processsome(err, bytecount, buff) { 
    lastLineFeed = buff.toString('utf-8', 0, bytecount).lastIndexOf('\n'); 

    if(lastLineFeed > -1){ 

     // Split the buffer by line 
     lineArray = buff.toString('utf-8', 0, bytecount).slice(0,lastLineFeed).split('\n'); 

     // Then split each line by comma 
     for(i=0;i<lineArray.length;i++){ 
      // Add read rows to an array for use elsewhere 
      valueArray.push(lineArray[i].split(',')); 
     } 

     // Set a new position to read from 
     readbytes+=lastLineFeed+1; 
    } else { 
     // No complete lines were read 
     readbytes+=bytecount; 
    } 
    process.nextTick(readFile); 
} 
+0

這是直接解決我的問題的一個很好的例子。這需要加強,但一次只能處理一行,但可以說這是一件好事;節點缺乏現有的fs接口意味着它是完全可定製的,所以即使我必須編寫額外的代碼,我也可以實現我所需要的。 –

+0

我擴展了上面的示例以使用CSV文件。 –

+0

這在絕對當作節點運行,但我怎樣才能把這段代碼放在app.js中並在html頁面中得到結果呢? – sand

4

爲什麼你認爲tail -f是黑客?

雖然我發現一個很好的例子,我會做類似的事情。使用的是Node.js和WebSocket的 實時在線活動監視器例如:
http://blog.new-bamboo.co.uk/2009/12/7/real-time-online-activity-monitor-example-with-node-js-and-websocket

只是爲了讓這個完整的答案,我寫給你這將0.8.0下運行的示例代碼 - (HTTP服務器是一個黑客可能)。

孩子進程產生與尾運行,並且因爲一個子進程是三流的EventEmitter(我們在我們的例子標準輸出使用),你可以只用on

文件名添加了一個監聽器:tailServer。JS

用法:node tailServer /var/log/filename.log

var http = require("http"); 
var filename = process.argv[2]; 


if (!filename) 
    return console.log("Usage: node tailServer filename"); 

var spawn = require('child_process').spawn; 
var tail = spawn('tail', ['-f', filename]); 

http.createServer(function (request, response) { 
    console.log('request starting...'); 

    response.writeHead(200, {'Content-Type': 'text/plain' }); 

    tail.stdout.on('data', function (data) { 
     response.write('' + data);     
    }); 
}).listen(8088); 

console.log('Server running at http://127.0.0.1:8088/'); 
+0

我與尾-f關注的是,它需要讀取過程是活動的文件寫入之前,如果不是數據丟失。我的用例是這樣的,讀取可能會在數據寫入後很長時間發生。對於更新到0.8的+1,雖然這對於從同一個源控制寫入和讀取的位置來說是一個很好的解決方案。 –

+0

watchFile也是事件驅動的,但根據文檔不穩定。上面的例子handels通過在高級代碼中進行輪詢來修改文件。對我來說,這看起來像一個黑客。但只要它對你有用,這是很好的做法。否則,如果文件不存在,則可以「觸摸」該文件,並且不會丟失任何數據,並且可以用「wc -l message.text | awk'{print $ 1}''並將其交給'tail -f -n' – vik

0

我從@hasanyasin處取得了答案並將其包裹變成模塊化的承諾。基本的想法是,你傳遞一個文件和一個處理函數來處理從文件讀取的字符串化緩衝區。如果處理函數返回true,那麼該文件將停止讀取。你也可以設置一個超時,如果處理程序沒有足夠快地返回真,它將會終止讀取。

如果resolve()由於超時被調用,promiser將返回true,否則它將返回false。

查看底部的使用示例。

// https://stackoverflow.com/a/11233045 

var fs = require('fs'); 
var Promise = require('promise'); 

class liveReaderPromiseMe { 
    constructor(file, buffStringHandler, opts) { 
     /* 
      var opts = { 
       starting_position: 0, 
       byte_size: 256, 
       check_for_bytes_every_ms: 3000, 
       no_handler_resolution_timeout_ms: null 
      }; 
     */ 

     if (file == null) { 
      throw new Error("file arg must be present"); 
     } else { 
      this.file = file; 
     } 

     if (buffStringHandler == null) { 
      throw new Error("buffStringHandler arg must be present"); 
     } else { 
      this.buffStringHandler = buffStringHandler; 
     } 

     if (opts == null) { 
      opts = {}; 
     } 

     if (opts.starting_position == null) { 
      this.current_position = 0; 
     } else { 
      this.current_position = opts.starting_position; 
     } 

     if (opts.byte_size == null) { 
      this.byte_size = 256; 
     } else { 
      this.byte_size = opts.byte_size; 
     } 

     if (opts.check_for_bytes_every_ms == null) { 
      this.check_for_bytes_every_ms = 3000; 
     } else { 
      this.check_for_bytes_every_ms = opts.check_for_bytes_every_ms; 
     } 

     if (opts.no_handler_resolution_timeout_ms == null) { 
      this.no_handler_resolution_timeout_ms = null; 
     } else { 
      this.no_handler_resolution_timeout_ms = opts.no_handler_resolution_timeout_ms; 
     } 
    } 


    startHandlerTimeout() { 
     if (this.no_handler_resolution_timeout_ms && (this._handlerTimer == null)) { 
      var that = this; 
      this._handlerTimer = setTimeout(
       function() { 
        that._is_handler_timed_out = true; 
       }, 
       this.no_handler_resolution_timeout_ms 
      ); 
     } 
    } 

    clearHandlerTimeout() { 
     if (this._handlerTimer != null) { 
      clearTimeout(this._handlerTimer); 
      this._handlerTimer = null; 
     } 
     this._is_handler_timed_out = false; 
    } 

    isHandlerTimedOut() { 
     return !!this._is_handler_timed_out; 
    } 


    fsReadCallback(err, bytecount, buff) { 
     try { 
      if (err) { 
       throw err; 
      } else { 
       this.current_position += bytecount; 
       var buff_str = buff.toString('utf-8', 0, bytecount); 

       var that = this; 

       Promise.resolve().then(function() { 
        return that.buffStringHandler(buff_str); 
       }).then(function(is_handler_resolved) { 
        if (is_handler_resolved) { 
         that.resolve(false); 
        } else { 
         process.nextTick(that.doReading.bind(that)); 
        } 
       }).catch(function(err) { 
        that.reject(err); 
       }); 
      } 
     } catch(err) { 
      this.reject(err); 
     } 
    } 

    fsRead(bytecount) { 
     fs.read(
      this.file, 
      new Buffer(bytecount), 
      0, 
      bytecount, 
      this.current_position, 
      this.fsReadCallback.bind(this) 
     ); 
    } 

    doReading() { 
     if (this.isHandlerTimedOut()) { 
      return this.resolve(true); 
     } 

     var max_next_bytes = fs.fstatSync(this.file).size - this.current_position; 
     if (max_next_bytes) { 
      this.fsRead((this.byte_size > max_next_bytes) ? max_next_bytes : this.byte_size); 
     } else { 
      setTimeout(this.doReading.bind(this), this.check_for_bytes_every_ms); 
     } 
    } 


    promiser() { 
     var that = this; 
     return new Promise(function(resolve, reject) { 
      that.resolve = resolve; 
      that.reject = reject; 
      that.doReading(); 
      that.startHandlerTimeout(); 
     }).then(function(was_resolved_by_timeout) { 
      that.clearHandlerTimeout(); 
      return was_resolved_by_timeout; 
     }); 
    } 
} 


module.exports = function(file, buffStringHandler, opts) { 
    try { 
     var live_reader = new liveReaderPromiseMe(file, buffStringHandler, opts); 
     return live_reader.promiser(); 
    } catch(err) { 
     return Promise.reject(err); 
    } 
}; 

然後用上面這樣的代碼:

var fs = require('fs'); 
var path = require('path'); 
var Promise = require('promise'); 
var liveReadAppendingFilePromiser = require('./path/to/liveReadAppendingFilePromiser'); 

var ending_str = '_THIS_IS_THE_END_'; 
var test_path = path.join('E:/tmp/test.txt'); 

var s_list = []; 
var buffStringHandler = function(s) { 
    s_list.push(s); 
    var tmp = s_list.join(''); 
    if (-1 !== tmp.indexOf(ending_str)) { 
     // if this return never occurs, then the file will be read until no_handler_resolution_timeout_ms 
     // by default, no_handler_resolution_timeout_ms is null, so read will continue forever until this function returns something that evaluates to true 
     return true; 
     // you can also return a promise: 
     // return Promise.resolve().then(function() { return true; }); 
    } 
}; 

var appender = fs.openSync(test_path, 'a'); 
try { 
    var reader = fs.openSync(test_path, 'r'); 
    try { 
     var options = { 
      starting_position: 0, 
      byte_size: 256, 
      check_for_bytes_every_ms: 3000, 
      no_handler_resolution_timeout_ms: 10000, 
     }; 

     liveReadAppendingFilePromiser(reader, buffStringHandler, options) 
     .then(function(did_reader_time_out) { 
      console.log('reader timed out: ', did_reader_time_out); 
      console.log(s_list.join('')); 
     }).catch(function(err) { 
      console.error('bad stuff: ', err); 
     }).then(function() { 
      fs.closeSync(appender); 
      fs.closeSync(reader); 
     }); 

     fs.write(appender, '\ncheck it out, I am a string'); 
     fs.write(appender, '\nwho killed kenny'); 
     //fs.write(appender, ending_str); 
    } catch(err) { 
     fs.closeSync(reader); 
     console.log('err1'); 
     throw err; 
    } 
} catch(err) { 
    fs.closeSync(appender); 
     console.log('err2'); 
    throw err; 
}