我正在處理常見的場景,我想訪問倉庫的一個子集,而不必擔心必須保持更新,例如'得到所有價格大於10的訂單'。我已經實施了一個解決方案,但有兩個問題(最後列出)。使用Rx構建Observable倉庫
存儲庫的子集可以用等價的東西來實現
var expensiveOrders = Repository.GetOrders().Where(o => o.Price > 10);
但這是IEnumerable
當原始集合更新將不會被更新。我可以爲CollectionChanged
添加處理程序,但如果我們想訪問另一個子集,該怎麼辦?
var expensiveOrdersFromBob = expensiveOrders.Where(o => o.Name == Bob);
我們不得不爲這一個連接一個集合 - 更改。實時更新的概念讓我想到了Rx,因此我準備構建一個ObservableCache,其中包含自動更新自身的ObservableCollection
項目和用於通知的RX流。 (流也引擎蓋下有什麼更新緩存。)
class ObservableCache<T> : IObservableCache<T>
{
private readonly ObservableCollection<T> _cache;
private readonly IObservable<Tuple<T, CRUDOperationType>> _updates;
public ObservableCache(IEnumerable<T> initialCache
, IObservable<Tuple<T, CRUDOperationType>> currentStream, Func<T, bool> filter)
{
_cache = new ObservableCollection<T>(initialCache.Where(filter));
_updates = currentStream.Where(tuple => filter(tuple.Item1));
_updates.Subscribe(ProcessUpdate);
}
private void ProcessUpdate(Tuple<T, CRUDOperationType> update)
{
var item = update.Item1;
lock (_cache)
{
switch (update.Item2)
{
case CRUDOperationType.Create:
_cache.Add(item);
break;
case CRUDOperationType.Delete:
_cache.Remove(item);
break;
case CRUDOperationType.Replace:
case CRUDOperationType.Update:
_cache.Remove(item); // ToDo: implement some key-based equality
_cache.Add(item);
break;
}
}
}
public ObservableCollection<T> Cache
{
get { return _cache; }
}
public IObservable<T> Updates
{
get { return _updates.Select(tuple => tuple.Item1); }
}
public IObservableCache<T> Where(Func<T, bool> predicate)
{
return new ObservableCache<T>(_cache, _updates, predicate);
}
}
然後,您可以使用它像這樣:
var expensiveOrders = new ObservableCache<Order>(_orders
, updateStream
, o => o.Price > 10);
expensiveOrders.Updates.Subscribe
(o => Console.WriteLine("Got new expensive order: " + o));
_observableBoundToSomeCtrl = expensiveOrders.Cache;
var expensiveOrdersFromBob = expensiveOrders
.Where(o => o.Name == "Bob");
expensiveOrdersFromBob.Updates.Subscribe
(o => Console.WriteLine("Got new expensive order from Bob: " + o));
_observableBoundToSomeOtherCtrl = expensiveOrdersFromBob.Cache;
如此反覆,其想法是,你可以保持投影緩存分成更窄和更窄的子集,並且不必擔心它不同步。那麼我的問題是什麼呢?
- 我想知道是否可以通過使RX本質上更新集合來消除CRUD內容。也許用一個Select或者類似的東西來「投射」更新?
- 存儲庫與更新模式存在固有的競爭條件,因爲我在構建新的高速緩存時可能會錯過一些更新。我認爲我需要某種排序,但這意味着讓我的所有T對象實現一個
ISequenceableItem
接口。有沒有更好的方法來做到這一點? RX非常棒,因爲它可以處理所有線程。我想利用這一點。
關於鎖定的好處。在更新流之前創建緩存的問題是,您將在創建緩存和訂閱流之間的時間內丟失更新。我試着做一個Replay(10)來確保舊值被替換,但這有點不好意思。 –
我看了一下Rxx,但似乎無法找到任何與我的問題相符的東西。我也查看了ReactiveUI的ReactiveCollection,但它似乎並不適合處理更新。 我真正需要的是一個緩存,它可以自我更新,並且可以使用Where()子句將其投影到一個狹窄的視圖中。我會將此標記爲現在的答案。謝謝。 –