2016-08-02 22 views
0

Node - how can i pipe to a new READABLE stream?的Node.js - 創建一個新的ReadStream一個新的文件,當文件達到一定規模

我想開始新的ReadStream我的直播編碼的MP3文件,當它到達一個繼通過使用fs.watchfs.stat確定尺寸(預緩衝,基本上)。

它可以工作,但一旦ReadStream已經啓動,我不知道如何退出觀察器並保持流運行。

我已經嘗試了承諾如下,但從來沒有得到解決,所以streamEncodedFile被一再呼籲:

var watcher = fs.watch(mp3RecordingFile); 

watcher.on('change', (event, path) => { 

    fs.stat(mp3RecordingFile, function (err, stats) { 

    if (stats.size > 75533) { 

     new Promise(function(resolve, reject) { 
       streamEncodedFile(); 
     }) 
     .then(function(result) { 
       watcher.close(); 
       console.log('watcher closed'); 
     }); 

    } 

    }); 
}); 

function streamEncodedFile() { 

    var mp3File = fs.createReadStream(mp3RecordingFile); 

      mp3File.on('data', function(buffer){ 
       io.sockets.emit('audio', { buffer: buffer }); 
      }); 

} 

我的另一個可憐的企圖是儘量只在某個文件的大小開始流:

watcher.on('change', (event, path) => { 

    fs.stat(mp3RecordingFile, function (err, stats) { 

    console.log(stats.size); 

    if (stats.size > 75533 && stats.size < 75535) { 
       streamEncodedFile(); 
    } else if (stats.size > 75535) { 
       watcher.close(); 
    } 

    }); 
}); 
+0

'變種觀察者= fs.watch(mp3RecordingFile); watcher.on( '變更',(事件,路徑)=> { fs.stat(mp3RecordingFile,函數(ERR,統計數據){ 如果(stats.size> 75533){ streamEncodedFile();} }); }); function streamEncodedFile(){ var mp3File = fs.createReadStream(mp3RecordingFile); mp3File.on('data',function(buffer){io.sockets.emit('audio',{ buffer:buffer }); }); watcher.close(); }' – dottodot

+0

謝謝。我應該說我嘗試過,但是ReadStream結束了。這我相信是爲什麼https://github.com/jasontbradshaw/tailing-stream – user3174541

回答

0

試試這個解決方案,通過緩衝和寫入文件。

const Writable = require('stream').Writable; 
const fs = require('fs'); 

let mp3File = fs.createWriteStream('path/to/file.mp3'); 

var buffer = new Buffer([]); 
//in bytes 
const CHUNK_SIZE = 102400; //100kb 

//Proxy for emitting and writing to file 
const myWritable = new Writable({ 
    write(chunk, encoding, callback) { 
    buffer = Buffer.concat([buffer, chunk]); 
    if(buffer.length >= CHUNK_SIZE) { 
     mp3File.write(buffer); 
     io.sockets.emit('audio', { buffer: buffer}); 
     buffer = new Buffer([]); 
    } 

    callback(); 
    } 
}); 

myWritable.on('finish',() => { 
    //emit final part if there is data to emit 
    if(buffer.length) { 
     //write final chunk and close fd 
     mp3File.end(buffer); 
     io.sockets.emit('audio', { buffer: buffer}); 
    } 
}); 


inbound_stream.pipe(encoder).pipe(myWritable); 
+0

等不及試試這個。再次感謝! – user3174541

+0

嗨* *納扎爾薩哈連科。我不明白你爲什麼使用'+ ='追加到緩衝區。這些塊是ArrayBuffer對象,需要保持這種方式才能在另一端進行解碼。我們不應該''buffer.push(塊)'?然後我需要以某種方式迭代數組以開始從它發送原始的ArrayBuffers。除非我錯過了一些東西。謝謝! – user3174541

+0

不,我忘了你操縱緩衝區。答案已更新。 –

相關問題