2017-03-31 28 views
0

因此,我創建了一個讀取流,首先連接到SFTP並開始從文件讀取。在任何時候,我的代碼都可以重新設置readstream並執行其他操作。例如,我可能會使用它來獲取CSV的前幾行並停止閱讀。節點流 - 在可讀流中收聽unpipe

問題是,我不知道如何在我的readStream構造函數中偵聽unpipe事件,以便我可以正確關閉SFTP連接。我在寫入流中使用flush方法,是否有類似於讀取流的方法?

這裏是我的readStream構造的簡化部分:

const Client = require('ssh2').Client, 
     nom = require('noms'); 

function getStream (get) { 
    const self = this; 
    const conn = new Client(); 

    let client, 
     fileData, 
     buffer, 
     totalBytes = 0, 
     bytesRead = 0; 

    let read = function(size,next) { 
     const read = this; 
     // Read each chunk of the file 
     client.read(fileData, buffer, bytesRead, size, bytesRead, 
      function (err, byteCount, buff, pos) { 
       bytesRead += byteCount; 
       read.push(buff); 
       next(); 
      } 
     ); 
    }; 

    let before = function(start) { 
     // setup the connection BEFORE we start _read 
     conn.on('ready', function(){ 
      conn.sftp(function(err,sftp) { 
       sftp.open(get, 'r', function(err, fd){ 
        sftp.fstat(fd, function(err, stats) { 
         client = sftp; 
         fileData = fd; 
         totalBytes = stats.size; 
         buffer = new Buffer(totalBytes); 

         start(); 
        }); 
       }); 
      }); 
     }).connect(credentials); 
    }; 

    return nom(read,before); 
} 

後來我可以稱之爲myStream.pipe(writeStream)然後myStream.unpipe()。但是因爲我沒有辦法監聽該事件,讀取停止,但SFTP連接保持打開並最終超時。

任何想法?

回答

0

因此,在進行了更多研究之後,我瞭解到當您致電readStream.unpipe(writeStream)時,ReadStreams不會通過unpipe事件。該事件只傳遞給writeStream。爲了監聽unpipe,你需要在readStream明確地發出一個事件,像這樣:

readStream.emit('unpipe'); 

您可以偵聽此事件的任何位置,內部或流構造函數,它真的很方便之外。所以,這將使這樣看上面的代碼:

故事
function getStream (get) { 
    /** 
    * ... stuff 
    * ... read() 
    * ... before() 
    * ... etc 
    */ 

    let readStream = nom(read,before); 

    readStream.on('unpipe', function(){ 
     console.log('called unpipe on read stream'); 
    }); 

    return readStream; 
} 

道德,流已經有Event Emitter class methods,這樣你就可以發出並監聽自定義事件開箱。