我有一個來自EventEmitter的Observable,它實際上只是一個http連接,流式事件。RxJs從另一個EventEmitter動態添加事件
偶爾我必須從基礎流斷開連接並重新連接。我不知道如何用rxjs來處理這個問題。
我不確定我是否可以完成一個源代碼,然後動態地將其他「源代碼」添加到源代碼中,或者如果我必須執行類似於最底層的操作。
var Rx = require('rx'),
EventEmitter = require('events').EventEmitter;
var eventEmitter = new EventEmitter();
var eventEmitter2 = new EventEmitter();
var source = Rx.Observable.fromEvent(eventEmitter, 'data')
var subscription = source.subscribe(function (data) {
console.log('data: ' + data);
});
setInterval(function() {
eventEmitter.emit('data', 'foo');
}, 500);
// eventEmitter stop emitting data, underlying connection closed
// now attach seconds eventemitter (new connection)
// something like this but obvouisly doesn't work
source
.fromEvent(eventEmitter2, 'data')
Puesdo代碼,更多的是我在做什麼,我創建第二個流連接之前,我關閉第一個,所以我不「輸」的任何數據。在這裏,我不知道如何停止Observable,而不會因爲onNext由於緩衝區而未被調用而導致「丟失」記錄。
var streams = [], notifiers = [];
// create initial stream
createNewStream();
setInterval(function() {
if (params of stream have changed) createNewStream();
}, $1minutes/3);
function createNewStream() {
var stream = new eventEmitterStream();
stream.once('connected', function() {
stopOthers();
streams.push(stream);
createSource(stream, 'name', 'id');
});
}
function stopOthers() {
while(streams.length > 0) {
streams.pop().stop(); // stop the old stream
}
while(notifiers.length > 0) {
// if i call this, the buffer may lose records, before onNext() called
//notifiers.pop()(Rx.Notification.createOnCompleted());
}
}
function createObserver(tag) {
return Rx.Observer.create(
function (x) {
console.log('Next: ', tag, x.length, x[0], x[x.length-1]);
},
function (err) {
console.log('Error: ', tag, err);
},
function() {
console.log('Completed', tag);
});
}
function createSource(stream, event, id) {
var source = Rx.Observable
.fromEvent(stream, event)
.bufferWithTimeOrCount(time, max);
var subscription = source.subscribe(createObserver(id));
var notifier = subscription.toNotifier();
notifiers.push(notifier);
}
在文檔中閱讀有關緩衝和背壓的部分。 –
@BenjaminGruenbaum我已閱讀文檔。常規頁面和特定方法有很多例子。這是我得到緩衝區方法的地方。是否有您認爲最有用的特定網頁或部分? – dre