2012-03-06 28 views
0

完整地傳播不同的事件流我需要通過一個主題代理所有不同的事件流。使用一個主題通過

我想出了這個代碼:

var mySubject, 
    getObservable; 

getObservable = function (subject, eventName) { 
    return subject 
     .asObservable() 
     .filter(function (x) { 
      return x.EventName === eventName; 
     }) 
     .flatMap(function (x) { 
      if (x.Type === 'onNext') { 
       return Rx.Observable.return(x.Data); 
      } 

      if (x.Type === 'onError') { 
       return Rx.Observable.throw(x.Data); 
      } 

      return Rx.Observable.empty(); 
     }); 
}; 

mySubject = new Rx.Subject(); 

getObservable(mySubject, 'foo') 
    .subscribe(function(x){ 
     console.log('foo onNext ' + x); 
    }, function(x){ 
     console.log('foo onError ' + x); 
    }, function(){ 
     console.log('foo onComplete'); 
    }); 

getObservable(mySubject, 'bar') 
    .subscribe(function(x){ 
     console.log('bar onNext ' + x); 
    }, function(x){ 
     console.log('bar onError ' + x); 
    }, function(){ 
     console.log('bar onComplete'); 
    }); 

mySubject.onNext({Type: 'onNext', EventName: 'foo', Data: 5}); 
mySubject.onNext({Type: 'onCompleted', EventName: 'foo'}); 

mySubject.onNext({Type: 'onNext', EventName: 'bar', Data: 5}); 
mySubject.onNext({Type: 'onError', EventName: 'bar', Data: 'Error message'}); 

得到輸出:

foo onNext 5 

bar onNext 5 
bar onError Error message 

預期輸出:

foo onNext 5 
foo onCompleted 

bar onNext 5 
bar onError Error message 

對於bar事件,這就像一個魅力:onNext將會傳播d並且一旦錯誤發生,onError函數被調用並且事件流結束。但是,我無法讓它爲onComplete工作。

每當一個完整的通知提出我看到,Rx.Observable.empty()被調用,但不會導致訂戶onComplete處理程序被調用。相反,它調用它的onNext處理程序。

回答

1

getObservable函數返回一個observable,該observable訂閱通過subject發送的eventName事件。

let getObservable = function (subject, eventName) { 
    return Rx.Observable.create(function (observer) { 
     subject 
      .asObservable() 
      .filter(function(x) { 
       return x.EventName === eventName; 
      }) 
      .map(function(x) { 
       if (x.Type === 'onNext') { 
        observer.onNext(x.Data); 
       } 

       if (x.Type === 'onError') { 
        observer.onError(x.Data); 
       } 

       if (x.Type === 'onCompleted') { 
        observer.onCompleted(); 
       } 

       return x; 
      }) 
      .subscribe(); 
    }); 
}; 

這是使用來自原始質詢數據的工作示例:

var mySubject, 
 
    getObservable; 
 

 
getObservable = function (subject, eventName) { 
 
    return Rx.Observable.create(function (observer) { 
 
     subject 
 
      .asObservable() 
 
      .filter(function(x) { 
 
       return x.EventName === eventName; 
 
      }) 
 
      .map(function(x) { 
 
       if (x.Type === 'onNext') { 
 
        observer.onNext(x.Data); 
 
       } 
 

 
       if (x.Type === 'onError') { 
 
        observer.onError(x.Data); 
 
       } 
 
       
 
       if (x.Type === 'onCompleted') { 
 
        observer.onCompleted(); 
 
       } 
 
       
 
       return x; 
 
      }) 
 
      .subscribe(); 
 
    }); 
 
}; 
 

 
mySubject = new Rx.Subject(); 
 

 
getObservable(mySubject, 'foo') 
 
    .subscribe(function(x){ 
 
     console.log('SomethingHappened onNext ' + x); 
 
    }, function(x){ 
 
     console.log('SomethingHappened onError ' + x); 
 
    }, function(){ 
 
     console.log('SomethingHappened onComplete'); 
 
    }); 
 

 

 
getObservable(mySubject, 'bar') 
 
    .subscribe(function(x){ 
 
     console.log('DataUpdated onNext ' + x); 
 
    }, function(x){ 
 
     console.log('DataUpdated onError ' + x); 
 
    }, function(){ 
 
     console.log('DataUpdated onComplete'); 
 
    }); 
 

 
mySubject.onNext({Type: 'onNext', EventName: 'foo', Data: 5}); 
 
mySubject.onNext({Type: 'onCompleted', EventName: 'foo'}); 
 

 
mySubject.onNext({Type: 'onNext', EventName: 'bar', Data: 5}); 
 
mySubject.onNext({Type: 'onError', EventName: 'bar', Data: 'Error message'});
<script src='https://rawgit.com/Reactive-Extensions/RxJS/master/dist/rx.all.js'></script>

+0

偉大的工作邁克。不過,我不明白爲什麼我最初的嘗試不起作用。 – Christoph 2012-03-06 13:30:26

0

在.NET Observable.SelectMany使用Observable.Merge合併到一起流引導到一個複合觀察。恕我直言Observable.Merge只有在任何合併的觀察結束時纔會完成。

例如http://theburningmonk.com/2010/02/rx-framework-iobservable-merge/

這可能是問題的原因。

+0

是的,可以。但是你的解決方案也適用。考慮將其重構爲我的更新版本,因爲你的函數作爲dispose函數返回一個空函數,而我的更新版本將返回dispose函數。另外,它不使用選擇,但訂閱做的工作。 (選擇應該是免費的) – Christoph 2012-03-07 09:30:52

+0

是的,有道理。把SignalR和RxJs結合起來的好主意。我會玩弄時間。 – 2012-03-07 13:03:40

+0

如果你對它感興趣,你可以在這裏查看https://github.com/cburgdorf/SignalR.Reactive – Christoph 2012-03-07 20:20:29

相關問題