2011-07-22 146 views
2

我有一個可觀察的事件對象序列和一些觀察者處理特定類型的事件。我需要完成以下方案:Rx中的「獨佔」和「默認」訂閱模式

  1. 有些事件類型需要由第一觀察者匹配的條件來處理(例如observable.SubscribeExclusively(X => {}),併成爲「不可見的」到其他
  2. 如果沒有訂閱,請設置一些默認處理程序(例如observable.SubscribeIfNoSubscriptions(x => {})),以便不會丟失任何項目(例如,此處理程序可以將項目保存到數據庫中,以便稍後處理) 。

有沒有辦法來處理這些案件,其中Rx?

+1

我可能在這裏是錯誤的,但好像你需要對觀察者有點太多瞭解。在你的情況下,我可能會創建一個IObservable >,以便第一個用戶得到Observable A,其餘的得到Observable B.我沒有很好的建議#2,這就是爲什麼我沒有回答。 –

+0

感謝您的評論。我認爲在解決方案中,即使IEnumerable >也足夠了。然而,我感興趣的是一個更普遍的問題,那就是一個用戶如何阻止其他用戶接收事件......現在看來我最合理的是現在有一些不太類似的輸入和輸出序列遠離你所建議的解決方案。 –

回答

0

我不知道我相當神交您的方案,但如何做到這一點的罷工您:

IObservable<Event> streamOfEvents.SelectMany(x => { 
    if (matchesExclusiveItem1(x)) { 
     x += exclusiveItem1Handler; 
     return Observable.Empty<Event>(); 
    } 

    // Default case 
    return Observable.Return(x); 
}).Subscribe(x => { 
    // Default case 
    x += defaultHandler; 
}); 

我使用「事件對象」,因爲你指定的那是什麼,但它很可能會更好使用IObservable<IObservable<T>> - 這個選擇器有副作用(連接事件),這是不好的。

+0

感謝您的回答。 –

1

「排他性」更容易 - 您只需讓其他人訂閱獨家觀察員的過濾輸出。

「默認」更難 - RX編程是一種功能性編程,用戶彼此不知道,而根據定義,「默認」用戶意味着在觀察者之間具有某種狀態共享。共享狀態的一種方法是使用ConcurrentBagTPL DataFlow的BufferBlock創建生產者/消費者隊列。另一種方法是使用這樣一類的「處理」狀態附加到事件本身:

public class Handled<T> 
{ 
    public bool IsHandled { get; set; } 
    public T Data { get; set; } 
} 

在任何情況下,你將不得不給觀察員一段時間才能使用「默認」處理程序之前作出反應。下面的代碼說明了「獨佔」和「默認」的概念:

 var source = new[] {0, 1, 2, 3, 4}.ToObservable(); 
     var afterExclusive = source 
      .Where(x => 
         { 
          if (x == 0) 
          { 
           Console.WriteLine("exclusive"); 
           return false; 
          } 
          return true; 
         }) 
      .Select(x => new Handled<int> {Data = x}) 
      .Publish(); // publish is a must otherwise 
     afterExclusive // we'll get non shared objects 
      .Do(x => { x.IsHandled = true; }) 
      .Subscribe(); 
     afterExclusive 
      .Delay(TimeSpan.FromSeconds(1)) 
      .Where(x => !x.IsHandled) 
      .Subscribe(x => Console.WriteLine("missed by all {0}", x)); 
     afterExclusive.Connect(); 
+0

很好的答案!謝謝 –