2015-10-05 17 views
0

我想實現一個通過套接字發送和接收文件的協議。該協議是指定的,我不能改變這一點。在NodeJs中對Stream實現感到困惑

我是新來的NodeJs,這是我如何實現這一點。

我會寫一個雙工流,並將管道文件寫入它。然後將其插入套接字以發送數據。

我應該閱讀這些內容以及在哪裏寫這些內容。如何知道讀取文件已完成,以及如何告知socket已完成。 Docs對我來說不是很清楚,並且Google使用了更多的混淆:)

任何幫助,將不勝感激。

P.S.當我回家時,我會添加自己的樣本,但現在我沒有。

編輯

@ MattHarrison的回答後,我改變了代碼轉換成這樣:

var stream = require('stream'); 
var util = require('util'); 
var bufferpack = require('bufferpack'); 
var fs = require('fs'); 
var net = require('net'); 

var MyProtocolStream = function() { 

    this.writtenHeader = false;   // have we written the header yet? 
    stream.Transform.call(this); 
}; 

util.inherits(MyProtocolStream, stream.Transform); 

MyProtocolStream.prototype._transform = function (chunk, encoding, callback) { 

    if (!this.writtenHeader) { 
     this.push('==== HEADER ====\n'); // if we've not, send the header first 
    } 

    // Can this function be interrupted at this very line? 
    // Then another _transform comes in and pushes its own data to socket 
    // Corrupted data maybe then? 
    // How to prevent this behavior? Buffering whole file before sending? 

    var self = this; 
    // I put a random timeout to simulate overlapped calls 
    // Can this happen in real world? 
    setTimeout(function() { 
     self.push(chunk); // send the incoming file chunks along as-is 
     callback(); 
    }, Math.random()*10); 
}; 

MyProtocolStream.prototype._flush = function (callback) { 

    this.push('==== FOOTER ====\n');  // Just before the stream closes, send footer 
    callback(); 
}; 

var file = '/tmp/a'; 

var server = net.createServer(function (sck) { 
    sck.addr = sck.remoteAddress; 
    console.log('Client connected - ' + sck.addr); 
    fs.createReadStream('/tmp/a').pipe(new MyProtocolStream()).pipe(sck); 
    fs.createReadStream('/tmp/b').pipe(new MyProtocolStream()).pipe(sck); 
    fs.createReadStream('/tmp/c').pipe(new MyProtocolStream()).pipe(sck); 
    sck.on('close', function() { 
     console.log('Client disconnected - ' + this.addr); 
    }) 
}); 

server.listen(22333, function() { 
    console.log('Server started on ' + 22333) 
}); 

看到我的評論在_transform

+0

這個問題現在太模糊了。你至少應該自己嘗試一下,然後問一個關於我們可以幫助的痛點的具體問題。 –

+0

@MattHarrison你是對的。我現在在家,並添加了我的示例代碼。 – vfsoraki

+0

你說你試圖實現一個自定義協議,但它看起來像只是在重新發明節點流時發送文件到套接字。您的自定義協議實際上做了什麼?也許你應該從那開始。 –

回答

3

我不確定您想要實現的協議的具體細節,但以下應該會給您一個很好的模式,以便您可以適應您的需求。

我虛構的協議

當客戶端插座連接到我的TCP服務器端,我想給他們一個文件。但首先我想發送一個標題。在文件結尾處,在流結束之前,我也想發送一個頭文件。所以寫入套接字數據的模樣:

==== HEADER ==== 
[FILE CONTENTS] 
==== FOOTER ==== 

實現一個變換流

所有我想要做的就是變換出來可讀流的數據。注意變換是這裏的關鍵字。我可以使用一個Transform流。

創建轉換流時,可以覆蓋兩種方法:_transform_flush。每個塊都來自可讀流,調用_transform。您可以更改數據,將其緩衝或進行其他操作。在可讀的所有數據完成後立即調用_flush。你可以在這裏做更多的清理工作,或者寫出最後一點數據。

var Stream = require('stream'); 
var Util = require('util'); 

var MyProtocolStream = function() { 

    this.writtenHeader = false;   // have we written the header yet? 
    Stream.Transform.call(this); 
}; 

Util.inherits(MyProtocolStream, Stream.Transform); 

MyProtocolStream.prototype._transform = function (chunk, encoding, callback) { 

    if (!this.writtenHeader) { 
     this.push('==== HEADER ====\n'); // if we've not, send the header first 
     this.writtenHeader = true; 
    } 
    this.push(chunk);      // send the incoming file chunks along as-is 
    callback(); 
}; 

MyProtocolStream.prototype._flush = function (callback) { 

    this.push('==== FOOTER ====\n');  // Just before the stream closes, send footer 
    callback(); 
}; 

使用MyProtocolStream

所以,現在我有我想要做什麼流,我可以簡單地管道的文件(或任何爲此事可讀流)通過我的自定義轉換流成任何其他可寫入流(例如套接字)。

var Fs = require('fs'); 
var Net = require('net'); 

var server = Net.createServer(function (socket) { 

    Fs.createReadStream('./example.txt') 
     .pipe(new MyProtocolStream()) 
     .pipe(socket); 
}); 

server.listen(8000); 

測試出來

我可以通過添加一些內容,測試了這一點,以example.txt

This is a line 
This is another line 
This is the last line 

我可以旋轉起來在我的服務器,然後通過telnet/NC連接:

$ telnet 127.0.0.1 8000 
Trying 127.0.0.1... 
Connected to localhost. 
Escape character is '^]'. 
==== HEADER ==== 
This is a line 
This is another line 
This is the last line 
==== FOOTER ==== 
Connection closed by foreign host. 

那麼雙面流呢?

雙工流是嵌入在一個內的兩個流。數據是從一個數據中提取出來的,而您將完全不同的數據寫入另一個。它用於與另一個實體進行雙向通信的地方(例如TCP套接字)。在這個例子中,我們不需要一個複式流,因爲數據只在一個方向流動:

file -> MyProtocolStream -> socket

學習更

由於梅厄在對方的回答中指出,Substack's stream handbook是規範(和最佳)的資源以及官方文檔。如果您仔細閱讀並自己實施示例,您將瞭解到有關流的所有知識。

串聯

發送多個文件,在單個插座如果你想要寫這些變換的多個輸出流至單個寫入結尾,pipe()是不會爲你工作。一旦EOF來自單個流,上游可寫入(套接字)也將被關閉。在這種情況下也不能保證數據事件的排序。所以,你需要通過聽data/end事件手動聚集流,開始讀取一個流另一完成後:

var server = Net.createServer(function (socket) { 

    var incoming1 = Fs.createReadStream('./example.txt') 
      .pipe(new MyProtocolStream()); 

    var incoming2 = Fs.createReadStream('./example.txt') 
      .pipe(new MyProtocolStream()); 

    var readStream = function (stream, callback) { 

     stream.on('data', socket.write.bind(socket)); 
     stream.on('end', callback); 
    }; 

    readStream(incoming1, function() { 

     readStream(incoming2, function() { 

      socket.end(); 
     }); 
    }); 
}); 

如果你嵌套回調厭惡的,你也可以使用承諾:

var server = Net.createServer(function (socket) { 

    var incoming1 = Fs.createReadStream('./example.txt') 
      .pipe(new MyProtocolStream()); 

    var incoming2 = Fs.createReadStream('./example.txt') 
      .pipe(new MyProtocolStream()); 

    var incoming3 = Fs.createReadStream('./example.txt') 
      .pipe(new MyProtocolStream()); 

    var readStream = function (stream) { 
     return new Promise(function (resolve, reject) { 
      stream.on('data', socket.write.bind(socket)); 
      stream.on('end', resolve); 
     }); 
    }; 

    readStream(incoming1) 
    .then(function() { 
     return readStream(incoming2); 
    }) 
    .then(function() { 
     return readStream(incoming3); 
    }) 
    .then(function() { 
     socket.end(); 
    }); 
}); 
+0

謝謝,它確實工作正常。只需要一些問題來更好地理解Node.js。我將服務器回調('FS.cre ...')中的代碼複製了三次,以瞭解將多個流傳輸到套接字時會發生什麼。他們似乎沒有按順序進行,但他們並沒有相互重疊。這是可靠的,還是可能發生重疊?我的意思是,重複獲得兩個'==== HEADER ===='。 '_flush'只被調用一次?當所有的流都完成了?我認爲它會在每個'MyProtocolStream'完成時被調用。 – vfsoraki

+0

請參閱我編輯的答案。 – vfsoraki

+0

問題我的意思,當然:) – vfsoraki

0

這是一個great resource on streams

至於套接字的使用,當你發送/獲取套接字消息時,你發送類型和一個有效載荷。因此,您的通用套接字流可能是在發送數據時發送類型爲'file_data'的消息,並在末尾發送類型爲'eof'(文件末尾)的消息,並使用空有效負載。

+0

我已經閱讀過,但不是那麼明顯。這很像官方文檔https://nodejs.org/api/stream.html – vfsoraki

+1

您是否試圖遵循這些示例?我實現了它們。他們工作並給出如何使用流和創建定製流的相當好的想法。另外,請看看直通2,它可能會爲您簡化一些事情。 – Meir

+0

或者基於socket的流,例如:https://www.npmjs.com/package/socket.io-stream – Meir