因此,我創建了一個讀取流,首先連接到SFTP並開始從文件讀取。在任何時候,我的代碼都可以重新設置readstream並執行其他操作。例如,我可能會使用它來獲取CSV的前幾行並停止閱讀。節點流 - 在可讀流中收聽unpipe
問題是,我不知道如何在我的readStream構造函數中偵聽unpipe
事件,以便我可以正確關閉SFTP連接。我在寫入流中使用flush
方法,是否有類似於讀取流的方法?
這裏是我的readStream構造的簡化部分:
const Client = require('ssh2').Client,
nom = require('noms');
function getStream (get) {
const self = this;
const conn = new Client();
let client,
fileData,
buffer,
totalBytes = 0,
bytesRead = 0;
let read = function(size,next) {
const read = this;
// Read each chunk of the file
client.read(fileData, buffer, bytesRead, size, bytesRead,
function (err, byteCount, buff, pos) {
bytesRead += byteCount;
read.push(buff);
next();
}
);
};
let before = function(start) {
// setup the connection BEFORE we start _read
conn.on('ready', function(){
conn.sftp(function(err,sftp) {
sftp.open(get, 'r', function(err, fd){
sftp.fstat(fd, function(err, stats) {
client = sftp;
fileData = fd;
totalBytes = stats.size;
buffer = new Buffer(totalBytes);
start();
});
});
});
}).connect(credentials);
};
return nom(read,before);
}
後來我可以稱之爲myStream.pipe(writeStream)
然後myStream.unpipe()
。但是因爲我沒有辦法監聽該事件,讀取停止,但SFTP連接保持打開並最終超時。
任何想法?