4

我正在處理常見的場景,我想訪問倉庫的一個子集,而不必擔心必須保持更新,例如'得到所有價格大於10的訂單'。我已經實施了一個解決方案,但有兩個問題(最後列出)。使用Rx構建Observable倉庫

存儲庫的子集可以用等價的東西來實現

var expensiveOrders = Repository.GetOrders().Where(o => o.Price > 10);

但這是IEnumerable當原始集合更新將不會被更新。我可以爲CollectionChanged添加處理程序,但如果我們想訪問另一個子集,該怎麼辦?

var expensiveOrdersFromBob = expensiveOrders.Where(o => o.Name == Bob); 

我們不得不爲這一個連接一個集合 - 更改。實時更新的概念讓我想到了Rx,因此我準備構建一個ObservableCache,其中包含自動更新自身的ObservableCollection項目和用於通知的RX流。 (流也引擎蓋下有什麼更新緩存。)

class ObservableCache<T> : IObservableCache<T> 
{ 
    private readonly ObservableCollection<T> _cache; 
    private readonly IObservable<Tuple<T, CRUDOperationType>> _updates; 

    public ObservableCache(IEnumerable<T> initialCache 
     , IObservable<Tuple<T, CRUDOperationType>> currentStream, Func<T, bool> filter) 
     { 
      _cache = new ObservableCollection<T>(initialCache.Where(filter)); 
      _updates = currentStream.Where(tuple => filter(tuple.Item1)); 
      _updates.Subscribe(ProcessUpdate); 
     } 

    private void ProcessUpdate(Tuple<T, CRUDOperationType> update) 
    { 
     var item = update.Item1; 
     lock (_cache) 
     { 
      switch (update.Item2) 
      { 
       case CRUDOperationType.Create: 
        _cache.Add(item); 
        break; 
       case CRUDOperationType.Delete: 
        _cache.Remove(item); 
        break; 
       case CRUDOperationType.Replace: 
       case CRUDOperationType.Update: 
        _cache.Remove(item); // ToDo: implement some key-based equality 
        _cache.Add(item); 
        break; 
      } 
     } 
    } 

    public ObservableCollection<T> Cache 
    { 
     get { return _cache; } 
    } 

    public IObservable<T> Updates 
    { 
     get { return _updates.Select(tuple => tuple.Item1); } 
    } 

    public IObservableCache<T> Where(Func<T, bool> predicate) 
    { 
     return new ObservableCache<T>(_cache, _updates, predicate); 
    } 
} 

然後,您可以使用它像這樣:

var expensiveOrders = new ObservableCache<Order>(_orders 
               , updateStream 
               , o => o.Price > 10); 
expensiveOrders.Updates.Subscribe 
    (o => Console.WriteLine("Got new expensive order: " + o)); 
_observableBoundToSomeCtrl = expensiveOrders.Cache; 

var expensiveOrdersFromBob = expensiveOrders 
          .Where(o => o.Name == "Bob"); 
expensiveOrdersFromBob.Updates.Subscribe 
     (o => Console.WriteLine("Got new expensive order from Bob: " + o)); 
_observableBoundToSomeOtherCtrl = expensiveOrdersFromBob.Cache; 

如此反覆,其想法是,你可以保持投影緩存分成更窄和更窄的子集,並且不必擔心它不同步。那麼我的問題是什麼呢?

  1. 我想知道是否可以通過使RX本質上更新集合來消除CRUD內容。也許用一個Select或者類似的東西來「投射」更新?
  2. 存儲庫與更新模式存在固有的競爭條件,因爲我在構建新的高速緩存時可能會錯過一些更新。我認爲我需要某種排序,但這意味着讓我的所有T對象實現一個ISequenceableItem接口。有沒有更好的方法來做到這一點? RX非常棒,因爲它可以處理所有線程。我想利用這一點。

回答

0

您不應該在ProcessUpdate之內鎖定_cache。如果您可觀察到的來源currentStream正在兌現Rx指南,那麼您一次只能在OnNext的單個調用中進行。換句話說,當你還在處理先前的值時,你不會從流中接收到另一個值。

解決您的競爭條件的唯一可靠方法是確保您在updateStream開始生成數據之前創建緩存。您可能想看看Extensions for Reactive Extensions (Rxx)。我相信Dave已經構建了許多用於將UI控件綁定到可觀察數據的實用程序。文檔很少。我不知道你在做什麼,有沒有什麼。

+0

關於鎖定的好處。在更新流之前創建緩存的問題是,您將在創建緩存和訂閱流之間的時間內丟失更新。我試着做一個Replay(10)來確保舊值被替換,但這有點不好意思。 –

+0

我看了一下Rxx,但似乎無法找到任何與我的問題相符的東西。我也查看了ReactiveUI的ReactiveCollection,但它似乎並不適合處理更新。 我真正需要的是一個緩存,它可以自我更新,並且可以使用Where()子句將其投影到一個狹窄的視圖中。我會將此標記爲現在的答案。謝謝。 –

0

假設你有一個這樣的定義:

class SetOp<T> 
{ 
    public T Value { get; private set; } 
    public bool Include { get; private set; } 
    public SetOp(T value, bool include) 
    { 
     Value = value; 
     Include = include; 
    } 
} 

使用Observable.ScanSystem.Collections.Immutable你CA ñ做這樣的事情:

IObservable<SetOp<int>> ops = ...; 
IImmutableSet<int> empty = ImmutableSortedSet<int>.Empty; 
var observableSet = ops 
    .Scan(empty, (s, op) => op.Include ? s.Add(op.Value) : s.Remove(op.Value)) 
    .StartWith(empty); 

使用不可改變的集合類型是這裏的關鍵絕招:observableSet的任何觀察者可以爲所欲爲與在它推動了價值,因爲它們是不可變的。添加它是有效的,因爲它在連續值之間重用了大部分集合數據結構。

以下是ops流和相應的observableSet的示例。

ops  observableSet 
-------- ------------------ 
      {} 
Add 7  {7} 
Add 4  {4,7} 
Add 5  {4,5,7} 
Add 6  {4,5,6,7} 
Remove 5 {4,6,7} 
Add 8  {4,6,7,8} 
Remove 4 {6,7,8}