2011-10-10 77 views
3

使用Rx,我想觀察一個傳統對象,該對象公開了方法GetItems和事件NewItem觀察同步結果和異步結果

當調用GetItems時,它將同步返回緩存中所有項的列表。它還會產生異步獲取項目,這些項目將在收到時通過NewItem事件發佈。

如何通過編寫LINQ查詢以一致的方式觀察這兩個源(sync + async),以便捕獲這兩個結果集?生產訂單並不重要。

+0

你是什麼意思的'一致'?你的意思是「在處理來自NewItem的任何項目之前,處理來自GetItems的所有項目?」 –

+0

@Chris:對不起模糊 - 我想制定一個LINQ查詢,以便捕獲兩個結果集。生產訂單並不重要。 –

+0

@JohannGerell:Enigmativity在我原來的解決方案中發現了一個競爭條件。請看我更新的答案。 –

回答

2

讓我看看我是否理解你的遺留物體。我假設它是一個通用型,看起來像這樣:

public class LegacyObject<T> 
{ 
    public IEnumerable<T> GetItems(); 
    public event EventHandler<NewItemEventArgs<T>> NewItem; 
} 

隨着新項目的事件參數是這樣的:

public class NewItemEventArgs<T> : System.EventArgs 
{ 
    public T NewItem { get; private set; } 
    public NewItemEventArgs(T newItem) 
    { 
     this.NewItem = newItem; 
    } 
} 

現在,我已經創建了一個LegacyObject<T>擴展.ToObservable()方法:

public static IObservable<T> ToObservable<T>(
    this LegacyObject<T> @this) 
{ 
    return Observable.Create<T>(o => 
    { 
     var gate = new object(); 
     lock (gate) 
     { 
      var list = new List<T>(); 
      var subject = new Subject<T>(); 
      var newItems = Observable 
       .FromEventPattern<NewItemEventArgs<T>>(
        h => @this.NewItem += h, 
        h => @this.NewItem -= h) 
       .Select(ep => ep.EventArgs.NewItem); 
      var inner = newItems.Subscribe(ni => 
      { 
       lock (gate) 
       { 
        if (!list.Contains(ni)) 
        { 
         list.Add(ni); 
         subject.OnNext(ni); 
        } 
       } 
      }); 
      list.AddRange(@this.GetItems()); 
      var outer = list.ToArray().ToObservable() 
       .Concat(subject).Subscribe(o); 
      return new CompositeDisposable(inner, outer); 
     } 
    }); 
} 

此方法創建爲每個用戶一個新的觀察到 - 這是編寫擴展方法,像這樣的時候做正確的事情。

它創建一個gate對象來鎖定對內部列表的訪問。

因爲您表示致電GetItems的行爲會產生異步功能以獲取新項目,所以我確保NewItem訂閱是在致電GetItems之前創建的。

inner訂閱會檢查新項目是否在列表中,如果該項目不在列表中,只會調用OnNext

GetItems進行調用並將值通過AddRange添加到內部列表中。

NewItem事件開始在另一個線程上觸發之前,不太可能但可能會將項目添加到列表中。這就是爲什麼鎖定訪問列表的原因。 inner訂閱將等待,直到它可以獲取鎖定,然後再嘗試將項目添加到列表中,這會在初始項目添加到列表後發生。

最後,內部列表變成可觀察值,與主題連接,並且觀察者訂閱此可觀察值。

使用CompositeDisposable將兩個訂閱作爲單個IDisposable返回。

這就是它的ToObservable方法。

現在,我通過在遺留對象上創建一個構造函數來測試這個問題,它允許我傳遞一個枚舉值和一個可觀察值。當調用GetItems並且可觀察值驅動NewItem事件時,將返回枚舉值。

所以,我的測試代碼是這樣的:

var tester = new Subject<int>(); 
var legacy = new LegacyObject<int>(new [] { 1, 2, 3, }, tester); 

var values = legacy.ToObservable(); 

values.Subscribe(v => Console.WriteLine(v)); 

tester.OnNext(3); 
tester.OnNext(4); 
tester.OnNext(4); 
tester.OnNext(5); 

並寫入到控制檯中值分別爲:

1 
2 
3 
4 
5 

讓我知道這是否符合您的需求。

+0

謝謝 - 我不知所措!你說*這個方法爲每個訂閱者創建一個新的可觀察對象 - 這是寫這樣的擴展方法時要做的正確的事情。* - 你有一些我可以閱讀的文本的參考嗎? –

+0

@JohannGerell - 請參閱[Rx設計指南](http://go.microsoft.com/fwlink/?LinkID=205219)第17頁。 – Enigmativity

+0

完美! (在問這個問題之前,我實際上瀏覽了那個文檔,但是那個特定的內容並沒有粘住...... ;-) –

2

如果我理解正確你的問題,也可能是使用像這樣做:

legacyObject.GetItems().ToObservable() 
    .Merge(
     Observable.FromEventPattern<IntEventArgs>(legacyObject, "NewItem") 
      .Select(e => e.EventArgs.Value)); 

編輯:Enigmativity發現上面的解決方案(見下文評論)的競爭條件。這個有望解決這個問題:

Observable.FromEventPattern<IntEventArgs>(legacyObject, "NewItem") 
    .Select(e => e.EventArgs.Value) 
    .Merge(Observable.Defer(() => legacyObject.GetItems().ToObservable())); 

這是我的測試代碼的其餘部分,如果它有幫助。我不知道我是否準確地模擬了你的遺產班級:

class IntEventArgs : EventArgs 
{ 
    public IntEventArgs(int value) 
    { 
     Value = value; 
    } 

    public int Value { get; private set; } 
} 

class Legacy 
{ 
    public event EventHandler<IntEventArgs> NewItem; 

    public IList<int> GetItems() 
    { 
     GenerateNewItemAsync(); 
     return new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 }; 
    } 

    private void GenerateNewItemAsync() 
    { 
     Task.Factory.StartNew(() => 
     { 
      for (var i = 0; i < 100000; ++i) 
      { 
       var handler = NewItem; 
       if (handler != null) handler(this, new IntEventArgs(i)); 
       Thread.Sleep(500); 
      } 
     }); 
    } 
} 

class Example 
{ 
    static void Main() 
    { 
     var legacyObject = new Legacy(); 

     legacyObject.GetItems().ToObservable() 
      .Merge(
       Observable.FromEventPattern<IntEventArgs>(legacyObject, "NewItem") 
        .Select(e => e.EventArgs.Value)) 
      .Subscribe(Console.WriteLine); 

     Console.ReadLine(); 
    } 
} 
+0

真的很好!遺產班是現場。 –

+1

該解決方案存在爭用條件,因爲在事件處理程序訂閱之前,'GetItems'在'i'等於'0'時觸發第一個值。事實上,如果你從'GenerateNewItemAsync'方法中取出'Thread.Sleep(50)',那麼在我的測試中,會丟失前34個值(0到33)。 – Enigmativity

+0

好的。如果我扭轉小河的構成怎麼樣?讓我編輯我的答案,並告訴我你的想法。 –