2013-07-04 54 views
17

如果可能,我想通過管道將兩個Node.js流合併爲一個。我正在使用Transform流。從兩個管道流創建一個Node.js流

換句話說,我想讓我的圖書館返回myStream供人們使用。例如,它們可以這樣寫:

process.stdin.pipe(myStream).pipe(process.stdout); 

和國內我使用第三方vendorStream,做了一些工作,插到我這包含在myInternalStream自己的邏輯。那麼上面的內容會轉化爲:

process.stdin.pipe(vendorStream).pipe(myInternalStream).pipe(process.stdout); 

我可以這樣做嗎?我試過var myStream = vendorStream.pipe(myInternalStream)但這顯然不起作用。

打個比方與bash,比方說,我想編寫一個程序來檢查,如果信h存在於一些流(tail -n 1 | grep h)的最後一行,我可以創建一個shell腳本:

# myscript.sh 
tail -n 1 | grep h 

然後,如果人做:

$ printf "abc\ndef\nghi" | . myscript.sh 

這只是工作。

這是我到目前爲止有:

// Combine a pipe of two streams into one stream 

var util = require('util') 
    , Transform = require('stream').Transform; 

var chunks1 = []; 
var stream1 = new Transform(); 
var soFar = ''; 
stream1._transform = function(chunk, encoding, done) { 
    chunks1.push(chunk.toString()); 
    var pieces = (soFar + chunk).split('\n'); 
    soFar = pieces.pop(); 
    for (var i = 0; i < pieces.length; i++) { 
    var piece = pieces[i]; 
    this.push(piece); 
    } 
    return done(); 
}; 

var chunks2 = []; 
var count = 0; 
var stream2 = new Transform(); 
stream2._transform = function(chunk, encoding, done) { 
    chunks2.push(chunk.toString()); 
    count = count + 1; 
    this.push(count + ' ' + chunk.toString() + '\n'); 
    done(); 
}; 

var stdin = process.stdin; 
var stdout = process.stdout; 

process.on('exit', function() { 
    console.error('chunks1: ' + JSON.stringify(chunks1)); 
    console.error('chunks2: ' + JSON.stringify(chunks2)); 
}); 
process.stdout.on('error', process.exit); 


// stdin.pipe(stream1).pipe(stream2).pipe(stdout); 

// $ (printf "abc\nd"; sleep 1; printf "ef\nghi\n") | node streams-combine.js 
// Outputs: 
// 1 abc 
// 2 def 
// 3 ghi 
// chunks1: ["abc\nd","ef\nghi\n"] 
// chunks2: ["abc","def","ghi"] 

// Best working solution I could find 
var stream3 = function(src) { 
    return src.pipe(stream1).pipe(stream2); 
}; 
stream3(stdin).pipe(stdout); 

// $ (printf "abc\nd"; sleep 1; printf "ef\nghi\n") | node streams-combine.js 
// Outputs: 
// 1 abc 
// 2 def 
// 3 ghi 
// chunks1: ["abc\nd","ef\nghi\n"] 
// chunks2: ["abc","def","ghi"] 

這是在所有可能的?讓我知道,如果我想做的事情不明確。

謝謝!

回答

25

你可以看的東西要通過管道輸送到你的流,然後unpipe它和管道它流您有興趣:

var PassThrough = require('stream').PassThrough; 

var stream3 = new PassThrough(); 

// When a source stream is piped to us, undo that pipe, and save 
// off the source stream piped into our internally managed streams. 
stream3.on('pipe', function(source) { 
    source.unpipe(this); 
    this.transformStream = source.pipe(stream1).pipe(stream2); 
}); 

// When we're piped to another stream, instead pipe our internal 
// transform stream to that destination. 
stream3.pipe = function(destination, options) { 
    return this.transformStream.pipe(destination, options); 
}; 

stdin.pipe(stream3).pipe(stdout); 

您可以提取這個功能集成到自己施工的流類:

var util = require('util'); 
var PassThrough = require('stream').PassThrough; 

var StreamCombiner = function() { 
    this.streams = Array.prototype.slice.apply(arguments); 

    this.on('pipe', function(source) { 
    source.unpipe(this); 
    for(i in this.streams) { 
     source = source.pipe(this.streams[i]); 
    } 
    this.transformStream = source; 
    }); 
}; 

util.inherits(StreamCombiner, PassThrough); 

StreamCombiner.prototype.pipe = function(dest, options) { 
    return this.transformStream.pipe(dest, options); 
}; 

var stream3 = new StreamCombiner(stream1, stream2); 
stdin.pipe(stream3).pipe(stdout); 
+0

非常感謝@brandon,這是真棒!更新了我的要點https://gist.github.com/nicolashery/5910969 –

+0

這太棒了。我正在考慮做類似的事情,但我只是沒有信心,我並沒有錯過一些讓我的解決方案錯誤的微妙之處。感謝您的信任 – FellowMD

+0

FWIW,爲了使這個解決方案能夠工作,您需要將stream3管道傳輸到標準輸出(在這種情況下是stdin)。所以,沒有stream3.pipe(stdout); stream3.write(數據);但這是一個很大的幫助!謝謝! –

2

一種選擇是可能使用multipipe它可以讓你連鎖多個變換一起包裝成一個單一的變換流:

// my-stream.js 
var multipipe = require('multipipe'); 

module.exports = function createMyStream() { 
    return multipipe(vendorStream, myInternalStream); 
}; 

然後,你可以這樣做:

var createMyStream = require('./my-stream'); 

var myStream = createMyStream(); 

process.stdin.pipe(myStream).pipe(process.stdout);