2011-12-13 134 views
3

我想了解如何將多個可觀察事件流式傳輸到Rx的一組事件中。但是當我運行下面的代碼時,我得到一個異常。那麼這是否意味着多個觀察者總是因爲違反Rx語法而容易出現異常?我的意思是,如果這些多個觀察者中的兩個偶然產生一個事件(任何兩個觀察者同時產生一些概率),它應該給出一個例外。觀察者可以安全地使用Rx監聽多個觀察對象嗎?

DateTimeOffset start; 
     object sync = new object(); 
     var subject = new Subject<long>(); 
     var observer = Observer.Create<long>(c => 
     { 
      lock (sync) 
      { 
       Console.WriteLine(c); 
      } 
     }) 
      ; 

     var observable1 = Observable.Interval(TimeSpan.FromSeconds(2)); 
     var observable2 = Observable.Interval(TimeSpan.FromSeconds(5)); 
     var observable3 = Observable.Never<long>().Timeout 
      (start = DateTimeOffset.Now.AddSeconds(15), 
      (new long[] { 1 }).ToObservable()); 
     var observable4 = Observable.Never<long>().Timeout(start); 
     observable1.Subscribe(observer); 
     observable2.Subscribe(observer); 
     observable3.Subscribe(observer); 
     observable4.Subscribe(observer); 
     Thread.Sleep(20000); 

感謝Gideon的解釋。這是我得到的例外。你是對的,這是一個時間錯覺。這是一個編碼錯誤。謝謝。

System.TimeoutException: The operation has timed out. 
    at System.Reactive.Observer.<Create>b__8[T](Exception e) 
    at System.Reactive.AnonymousObserver`1.Error(Exception exception) 
    at System.Reactive.AbstractObserver`1.OnError(Exception error) 
    at System.Reactive.Subjects.Subject`1.OnError(Exception error) 
    at System.Reactive.AnonymousObservable`1.AutoDetachObserver.Error(Exception e 
xception) 
    at System.Reactive.AbstractObserver`1.OnError(Exception error) 
    at System.Reactive.AnonymousObservable`1.AutoDetachObserver.Error(Exception e 
xception) 
    at System.Reactive.AbstractObserver`1.OnError(Exception error) 
    at System.Reactive.Linq.Observable.<>c__DisplayClass28c`1.<>c__DisplayClass28 
e.<Throw>b__28b() 
    at System.Reactive.Concurrency.Scheduler.Invoke(IScheduler scheduler, Action 
action) 
    at System.Reactive.Concurrency.ImmediateScheduler.Schedule[TState](TState sta 
te, Func`3 action) 
    at System.Reactive.Concurrency.Scheduler.Schedule(IScheduler scheduler, Actio 
n action) 
    at System.Reactive.Linq.Observable.<>c__DisplayClass28c`1.<Throw>b__28a(IObse 
rver`1 observer) 
    at System.Reactive.AnonymousObservable`1.<>c__DisplayClass1.<Subscribe>b__0() 

    at System.Reactive.Concurrency.Scheduler.Invoke(IScheduler scheduler, Action 
action) 
    at System.Reactive.Concurrency.ScheduledItem`2.InvokeCore() 
    at System.Reactive.Concurrency.ScheduledItem`1.Invoke() 
    at System.Reactive.Concurrency.CurrentThreadScheduler.Trampoline.Run() 
    at System.Reactive.Concurrency.CurrentThreadScheduler.Schedule[TState](TState 
state, TimeSpan dueTime, Func`3 action) 
    at System.Reactive.Concurrency.CurrentThreadScheduler.Schedule[TState](TState 
state, Func`3 action) 
    at System.Reactive.Concurrency.Scheduler.Schedule(IScheduler scheduler, Actio 
n action) 
    at System.Reactive.AnonymousObservable`1.Subscribe(IObserver`1 observer) 
    at System.Reactive.Linq.Observable.<>c__DisplayClass543`1.<>c__DisplayClass54 
5.<Timeout>b__53f() 
    at System.Reactive.Concurrency.Scheduler.Invoke(IScheduler scheduler, Action 
action) 
    at System.Reactive.Concurrency.ThreadPoolScheduler.<>c__DisplayClass8`1.<Sche 
dule>b__6(Object _) 
    at System.Threading._TimerCallback.TimerCallback_Context(Object state) 
    at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, C 
ontextCallback callback, Object state, Boolean ignoreSyncCtx) 
    at System.Threading._TimerCallback.PerformTimerCallback(Object state) 

回答

5

是的,一個觀察者可以聽到多個觀察對象。最好的例子是Merge運營商。內置的操作符都將遵循RX語法,並且通常會在不支持的源上執行。

IObserver你從Observer.Create就是這樣的一個情況。一旦調用OnError或OnCompleted,它將忽略以後對OnNext的調用。這確實意味着使用相同的觀察者訂閱一個observable,然後在第一個observable之後另一個observable將不起作用,因爲來自第一個observable的終止消息將導致觀察者忽略來自第二個observable的消息。爲了解決這個問題,像Merge,ConcatOnErrorResumeNext(等等)這樣的運營商在內部使用多個觀察者,並且不從最後一個可觀察到的外部傳遞完成消息(取決於操作符的語義的OnError和/或OnCompleted)觀察者。

你沒有提到你做了什麼異常,但我猜想,這是來自於你從observable4得到超時錯誤。如果你不提供另一個可觀測到使用超時,觀察者的OnError被調用,併爲SubscribeObserver.Create重載不採取錯誤處理程序的默認OnError是簡單地拋出異常。

雖然這很明顯是示例/測試代碼,但我確實想指出,即使您不再收到傳遞給您的消息OnNext,但在此異常之後,所有其他可觀察對象仍繼續運行。您可以使用Merge爲您跟蹤這些信息,也可以跟蹤描述中的所有一次性物品,並在完成消息傳出時自行處理它們。 CompositeDisposable(在System.Reactive.Disposables)對此很有幫助。

2

你真的不應該在這裏使用鎖定的情況下,但如果你真的想這個工作,你可以這樣做:

var x = Observable.Create<T>(subj => { /* Fill it in*/ }) 
    .Multicast(new Subject<T>()); 

// Set up your subscriptions Here! 

// When you call the Connect, whatever is in the Observable.Create will be called 
x.Connect(); 

如果你想成爲更安全,你可以把它,以便的創建將被「重播」你爲未來的訂閱,通過使用ReplaySubject而不是主題(而有主題,用戶後連接會得到什麼)

結果