2015-04-24 51 views
3

我有一個來自EventEmitter的Observable,它實際上只是一個http連接,流式事件。RxJs從另一個EventEmitter動態添加事件

偶爾我必須從基礎流斷開連接並重新連接。我不知道如何用rxjs來處理這個問題。

我不確定我是否可以完成一個源代碼,然後動態地將其他「源代碼」添加到源代碼中,或者如果我必須執行類似於最底層的操作。

var Rx = require('rx'), 
    EventEmitter = require('events').EventEmitter; 

    var eventEmitter = new EventEmitter(); 
    var eventEmitter2 = new EventEmitter(); 

    var source = Rx.Observable.fromEvent(eventEmitter, 'data') 

    var subscription = source.subscribe(function (data) { 
    console.log('data: ' + data); 
    }); 

    setInterval(function() { 
    eventEmitter.emit('data', 'foo'); 
    }, 500); 

    // eventEmitter stop emitting data, underlying connection closed 
    // now attach seconds eventemitter (new connection) 

    // something like this but obvouisly doesn't work 
    source 
    .fromEvent(eventEmitter2, 'data') 

Puesdo代碼,更多的是我在做什麼,我創建第二個流連接之前,我關閉第一個,所以我不「輸」的任何數據。在這裏,我不知道如何停止Observable,而不會因爲onNext由於緩衝區而未被調用而導致「丟失」記錄。

var streams = [], notifiers = []; 

    // create initial stream 
    createNewStream(); 

    setInterval(function() { 
    if (params of stream have changed) createNewStream(); 
    }, $1minutes/3); 

    function createNewStream() { 
    var stream = new eventEmitterStream(); 

    stream.once('connected', function() { 
     stopOthers(); 

     streams.push(stream); 
     createSource(stream, 'name', 'id'); 
    }); 
    } 

    function stopOthers() { 
    while(streams.length > 0) { 
     streams.pop().stop(); // stop the old stream 
    } 

    while(notifiers.length > 0) { 
     // if i call this, the buffer may lose records, before onNext() called 
     //notifiers.pop()(Rx.Notification.createOnCompleted()); 
    } 
    } 

    function createObserver(tag) { 
    return Rx.Observer.create(
     function (x) { 
     console.log('Next: ', tag, x.length, x[0], x[x.length-1]); 
     }, 
     function (err) { 
     console.log('Error: ', tag, err); 
     }, 
     function() { 
     console.log('Completed', tag); 
     }); 
    } 

    function createSource(stream, event, id) { 
    var source = Rx.Observable 
     .fromEvent(stream, event) 
     .bufferWithTimeOrCount(time, max); 

    var subscription = source.subscribe(createObserver(id)); 
    var notifier = subscription.toNotifier(); 
    notifiers.push(notifier); 
    } 
+1

在文檔中閱讀有關緩衝和背壓的部分。 –

+0

@BenjaminGruenbaum我已閱讀文檔。常規頁面和特定方法有很多例子。這是我得到緩衝區方法的地方。是否有您認爲最有用的特定網頁或部分? – dre

回答

8

首先和formost,你需要確保你可以從你以前的「死」發射器中刪除所有的聽衆。否則,你會創建一個泄漏的應用程序。

看起來你唯一知道EventEmitter已經死的唯一方法就是觀察頻率,除非你有一個錯誤或完成(斷開連接)時觸發的事件。後者很多,更加可取。

無論如何,Rx的祕密是確保你的數據流的創建和拆卸在你可觀察的。如果將可見性的發射器創建包裝起來,並且可以將其拆分,那麼可以使用諸如retry運算符之類的可怕東西來重新創建可觀察對象。

所以,如果你無法知道它死的方式,並且要重新連接時,可以使用這樣的事情:

// I'll presume you have some function to get an EventEmitter that 
// is already set up 
function getEmitter() { 
    var emitter = new EventEmitter(); 
    setInterval(function(){ 
    emitter.emit('data', 'foo'); 
    }, 500) 
    return emitter; 
} 


var emitterObservable = Observable.create(function(observer) { 
    // setup the data stream 
    var emitter = getEmitter(); 
    var handler = function(d) { 
    observer.onNext(d); 
    }; 
    emitter.on('data', handler); 

    return function() { 
    // tear down the data stream in your disposal function 
    emitter.removeListener('on', handler); 
    }; 
}); 

// Now you can do Rx magic! 
emitterObservable 
    // if it doesn't emit in 700ms, throw a timeout error 
    .timeout(700) 
    // catch all* errors and retry 
    // this means the emitter will be torn down and recreated 
    // if it times out! 
    .retry() 
    // do something with the values 
    .subscribe(function(x) { console.log(x); }); 

*注意:重試捕撈所有錯誤,所以你可能想要在它上面添加一個catch來處理非超時錯誤。由你決定。

+0

謝謝你讓我知道'重試'。我正在嘗試它。僅供參考:當我運行時,我得到了'RangeError:超過最大調用堆棧大小'。我認爲這只是因爲沒有任何東西從'getEmitter'返回 – dre

+0

我試圖用重新連接擊中的行爲是在我的輪到「重新連接」,沒有「丟失數據」。這需要先連接另一個事件發射器,然後斷開第一個事件。 – dre

+0

有趣的是,我需要更多的上下文,你在連接之間丟失了什麼數據? –