使用Rx,我想觀察一個傳統對象,該對象公開了方法GetItems
和事件NewItem
。觀察同步結果和異步結果
當調用GetItems
時,它將同步返回緩存中所有項的列表。它還會產生異步獲取項目,這些項目將在收到時通過NewItem
事件發佈。
如何通過編寫LINQ查詢以一致的方式觀察這兩個源(sync + async),以便捕獲這兩個結果集?生產訂單並不重要。
使用Rx,我想觀察一個傳統對象,該對象公開了方法GetItems
和事件NewItem
。觀察同步結果和異步結果
當調用GetItems
時,它將同步返回緩存中所有項的列表。它還會產生異步獲取項目,這些項目將在收到時通過NewItem
事件發佈。
如何通過編寫LINQ查詢以一致的方式觀察這兩個源(sync + async),以便捕獲這兩個結果集?生產訂單並不重要。
讓我看看我是否理解你的遺留物體。我假設它是一個通用型,看起來像這樣:
public class LegacyObject<T>
{
public IEnumerable<T> GetItems();
public event EventHandler<NewItemEventArgs<T>> NewItem;
}
隨着新項目的事件參數是這樣的:
public class NewItemEventArgs<T> : System.EventArgs
{
public T NewItem { get; private set; }
public NewItemEventArgs(T newItem)
{
this.NewItem = newItem;
}
}
現在,我已經創建了一個LegacyObject<T>
擴展.ToObservable()
方法:
public static IObservable<T> ToObservable<T>(
this LegacyObject<T> @this)
{
return Observable.Create<T>(o =>
{
var gate = new object();
lock (gate)
{
var list = new List<T>();
var subject = new Subject<T>();
var newItems = Observable
.FromEventPattern<NewItemEventArgs<T>>(
h => @this.NewItem += h,
h => @this.NewItem -= h)
.Select(ep => ep.EventArgs.NewItem);
var inner = newItems.Subscribe(ni =>
{
lock (gate)
{
if (!list.Contains(ni))
{
list.Add(ni);
subject.OnNext(ni);
}
}
});
list.AddRange(@this.GetItems());
var outer = list.ToArray().ToObservable()
.Concat(subject).Subscribe(o);
return new CompositeDisposable(inner, outer);
}
});
}
此方法創建爲每個用戶一個新的觀察到 - 這是編寫擴展方法,像這樣的時候做正確的事情。
它創建一個gate
對象來鎖定對內部列表的訪問。
因爲您表示致電GetItems
的行爲會產生異步功能以獲取新項目,所以我確保NewItem
訂閱是在致電GetItems
之前創建的。
inner
訂閱會檢查新項目是否在列表中,如果該項目不在列表中,只會調用OnNext
。
對GetItems
進行調用並將值通過AddRange
添加到內部列表中。
在NewItem
事件開始在另一個線程上觸發之前,不太可能但可能會將項目添加到列表中。這就是爲什麼鎖定訪問列表的原因。 inner
訂閱將等待,直到它可以獲取鎖定,然後再嘗試將項目添加到列表中,這會在初始項目添加到列表後發生。
最後,內部列表變成可觀察值,與主題連接,並且觀察者訂閱此可觀察值。
使用CompositeDisposable
將兩個訂閱作爲單個IDisposable
返回。
這就是它的ToObservable
方法。
現在,我通過在遺留對象上創建一個構造函數來測試這個問題,它允許我傳遞一個枚舉值和一個可觀察值。當調用GetItems
並且可觀察值驅動NewItem
事件時,將返回枚舉值。
所以,我的測試代碼是這樣的:
var tester = new Subject<int>();
var legacy = new LegacyObject<int>(new [] { 1, 2, 3, }, tester);
var values = legacy.ToObservable();
values.Subscribe(v => Console.WriteLine(v));
tester.OnNext(3);
tester.OnNext(4);
tester.OnNext(4);
tester.OnNext(5);
並寫入到控制檯中值分別爲:
1
2
3
4
5
讓我知道這是否符合您的需求。
謝謝 - 我不知所措!你說*這個方法爲每個訂閱者創建一個新的可觀察對象 - 這是寫這樣的擴展方法時要做的正確的事情。* - 你有一些我可以閱讀的文本的參考嗎? –
@JohannGerell - 請參閱[Rx設計指南](http://go.microsoft.com/fwlink/?LinkID=205219)第17頁。 – Enigmativity
完美! (在問這個問題之前,我實際上瀏覽了那個文檔,但是那個特定的內容並沒有粘住...... ;-) –
如果我理解正確你的問題,也可能是使用像這樣做:
legacyObject.GetItems().ToObservable()
.Merge(
Observable.FromEventPattern<IntEventArgs>(legacyObject, "NewItem")
.Select(e => e.EventArgs.Value));
編輯:Enigmativity發現上面的解決方案(見下文評論)的競爭條件。這個有望解決這個問題:
Observable.FromEventPattern<IntEventArgs>(legacyObject, "NewItem")
.Select(e => e.EventArgs.Value)
.Merge(Observable.Defer(() => legacyObject.GetItems().ToObservable()));
這是我的測試代碼的其餘部分,如果它有幫助。我不知道我是否準確地模擬了你的遺產班級:
class IntEventArgs : EventArgs
{
public IntEventArgs(int value)
{
Value = value;
}
public int Value { get; private set; }
}
class Legacy
{
public event EventHandler<IntEventArgs> NewItem;
public IList<int> GetItems()
{
GenerateNewItemAsync();
return new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 };
}
private void GenerateNewItemAsync()
{
Task.Factory.StartNew(() =>
{
for (var i = 0; i < 100000; ++i)
{
var handler = NewItem;
if (handler != null) handler(this, new IntEventArgs(i));
Thread.Sleep(500);
}
});
}
}
class Example
{
static void Main()
{
var legacyObject = new Legacy();
legacyObject.GetItems().ToObservable()
.Merge(
Observable.FromEventPattern<IntEventArgs>(legacyObject, "NewItem")
.Select(e => e.EventArgs.Value))
.Subscribe(Console.WriteLine);
Console.ReadLine();
}
}
真的很好!遺產班是現場。 –
該解決方案存在爭用條件,因爲在事件處理程序訂閱之前,'GetItems'在'i'等於'0'時觸發第一個值。事實上,如果你從'GenerateNewItemAsync'方法中取出'Thread.Sleep(50)',那麼在我的測試中,會丟失前34個值(0到33)。 – Enigmativity
好的。如果我扭轉小河的構成怎麼樣?讓我編輯我的答案,並告訴我你的想法。 –
你是什麼意思的'一致'?你的意思是「在處理來自NewItem的任何項目之前,處理來自GetItems的所有項目?」 –
@Chris:對不起模糊 - 我想制定一個LINQ查詢,以便捕獲兩個結果集。生產訂單並不重要。 –
@JohannGerell:Enigmativity在我原來的解決方案中發現了一個競爭條件。請看我更新的答案。 –