2012-05-21 42 views
0

我正在使用Silverlight的Reactive框架,並希望實現以下功能。Rx Amb擴展

我嘗試爲Silverlight客戶端創建一個典型的數據提供者,該客戶端也利用MS Ent Lib中可用的緩存框架。這些場景要求我必須在訪問WCF數據客戶端之前檢查緩存中的鍵值對。

通過使用Rx擴展Amb,我能夠從緩存或WCF數據客戶端(無論哪個返回首先)獲取數據,但是如果值處於緩存中,如何阻止WCF客戶端執行調用?

我也想考慮比賽條件,例如如果第一個訂閱者請求某些數據並且提供者正在從WCF數據客戶端(異步)中獲取數據,那麼如何阻止後續異步請求執行相同的操作(在此階段,緩存尚未填充)。

+0

您確定要同時嘗試兩種方法並選擇首先返回的方法嗎?或者你願意嘗試一下嗎?同時嘗試它們意味着每次都會觸發您的數據服務,無論應用程序是否使用緩存 - 這看起來並不像預期的行爲。 – yamen

+0

@yamen你是完全正確的,這種行爲是無意的,我正在尋找替代這一點。 –

+0

我會在這裏留下這個:https://github.com/github/akavache –

回答

1

我有完全相同的問題。我用以下簽名的擴展方法解決了這個問題:

IObservable<R> FromCacheOrFetch<T, R>(
    this IObservable<T> source, 
    Func<T, R> cache, 
    Func<IObservable<T>, IObservable<R>> fetch, 
    IScheduler scheduler) where R : class 

有效這是什麼做的是採取在源可觀察並返回一個可觀察的,將匹配其產值每個輸入值。

要獲得每個輸出值,它將首先檢查緩存。如果該值存在於它使用的緩存中。如果不是這樣,它將在不在緩存中的值上旋轉fetch函數只有。如果所有的值都在緩存中,那麼fetch函數將永遠不會被啓動 - 所以沒有服務連接設置懲罰等。

我給你的代碼,但它是基於一個稍微不同的版本該擴展方法使用了一個Maybe<T>單子 - 所以你可能會發現你需要擺弄實現。

這就是:

public static IObservable<R> FromCacheOrFetch<T, R>(this IObservable<T> source, Func<T, R> cache, Func<IObservable<T>, IObservable<R>> fetch, IScheduler scheduler) 
     where R : class 
    { 
     return source.FromCacheOrFetch<T, R>(t => cache(t).ToMaybe(null), fetch, scheduler); 
    } 

    public static IObservable<R> FromCacheOrFetch<T, R>(this IObservable<T> source, Func<T, Maybe<R>> cache, Func<IObservable<T>, IObservable<R>> fetch, IScheduler scheduler) 
    { 
     var results = new Subject<R>(); 

     var disposables = new CompositeDisposable(); 

     var loop = new EventLoopScheduler(); 
     disposables.Add(loop); 

     var sourceDone = false; 
     var pairsDone = true; 
     var exception = (Exception)null; 

     var fetchIn = new Subject<T>(); 
     var fetchOut = (IObservable<R>)null; 
     var pairs = (IObservable<KeyValuePair<int, R>>)null; 

     var lookup = new Dictionary<T, int>(); 
     var list = new List<Maybe<R>>(); 
     var cursor = 0; 

     Action checkCleanup =() => 
     { 
      if (sourceDone && pairsDone) 
      { 
       if (exception == null) 
       { 
        results.OnCompleted(); 
       } 
       else 
       { 
        results.OnError(exception); 
       } 
       loop.Schedule(() => disposables.Dispose()); 
      } 
     }; 

     Action dequeue =() => 
     { 
      while (cursor != list.Count) 
      { 
       var mr = list[cursor]; 
       if (mr.HasValue) 
       { 
        results.OnNext(mr.Value); 
        cursor++; 
       } 
       else 
       { 
        break; 
       } 
      } 
     }; 

     Action<KeyValuePair<int, R>> nextPairs = kvp => 
     { 
      list[kvp.Key] = Maybe<R>.Something(kvp.Value); 
      dequeue(); 
     }; 

     Action<Exception> errorPairs = ex => 
     { 
      fetchIn.OnCompleted(); 
      pairsDone = true; 
      exception = ex; 
      checkCleanup(); 
     }; 

     Action completedPairs =() => 
     { 
      pairsDone = true; 
      checkCleanup(); 
     }; 

     Action<T> sourceNext = t => 
     { 
      var mr = cache(t); 
      list.Add(mr); 
      if (mr.IsNothing) 
      { 
       lookup[t] = list.Count - 1; 
       if (fetchOut == null) 
       { 
        pairsDone = false; 
        fetchOut = fetch(fetchIn.ObserveOn(Scheduler.ThreadPool)); 
        pairs = fetchIn.Select(x => lookup[x]).Zip(fetchOut, (i, r2) => new KeyValuePair<int, R>(i, r2)); 
        disposables.Add(pairs.ObserveOn(loop).Subscribe(nextPairs, errorPairs, completedPairs)); 
       } 
       fetchIn.OnNext(t); 
      } 
      else 
      { 
       dequeue(); 
      } 
     }; 

     Action<Exception> errorSource = ex => 
     { 
      sourceDone = true; 
      exception = ex; 
      fetchIn.OnCompleted(); 
      checkCleanup(); 
     }; 

     Action completedSource =() => 
     { 
      sourceDone = true; 
      fetchIn.OnCompleted(); 
      checkCleanup(); 
     }; 

     disposables.Add(source.ObserveOn(loop).Subscribe(sourceNext, errorSource, completedSource)); 

     return results.ObserveOn(scheduler); 
    } 

使用例子是這樣的:

你必須要獲取指標的來源:

IObservable<X> source = ... 

你將有一個函數可以從緩存中獲取值,並且可以將它們放入一個操作(兩者都應該是這樣d安全):

Func<X, Y> getFromCache = x => ...; 
Action<X, Y> addToCache = (x, y) => ...; 

那麼你將有實際的電話會議,以獲得從您的數據庫或服務的數據:

Func<X, Y> getFromService = x => ...; 

則你可以定義fetch像這樣:

Func<IObservable<X>, IObservable<Y>> fetch = 
    xs => xs.Select(x => 
    { 
     var y = getFromService(x); 
     addToCache(x, y); 
     return y; 
    }); 

最後,您可以通過以下方式進行查詢:

IObservable<Y> results = 
    source.FromCacheOrFetch(
     getFromCache, 
     fetch, 
     Scheduler.ThreadPool); 

當然,您需要訂閱結果才能進行計算。

+0

請給我一個這個擴展的用法的簡單例子。這看起來像是我所追求的解決方案。 –

+0

@TriQ - 我添加了一個例子 - 這是相當基本的,但它可能會有所幫助。 – Enigmativity

+0

有一個問題,當兩個用戶希望同時獲取(服務調用返回並填充緩存所需的時間)時會發生什麼。這是否意味着服務會被擊兩次? –

0

顯然Amb不是正確的路,因爲每次都會觸發緩存和服務。如果緩存未命中,EntLib會返回什麼結果?

注意Observable.Timeout是一個合理的選擇:

cache(<paramters>).Timeout(TimeSpan.FromSeconds(1), service<paramters>); 

但顯然它不是一個偉大的想法,超時,如果你想,而不是處理來自EntLib返回,而是採取適當的行動。

我不明白爲什麼這一定是一個反應性擴展問題。

+0

如果緩存不存在,緩存將返回null。 –

0

一種簡單的方法,這可能是比@ Enigmativity的解決方案可能是沿着線的東西少功能齊全:

public IObservable<T> GetCachedValue<TKey, TResult>(TKey key, Func<TKey, TResult> getFromCache, Func<TKey, TResult> getFromSource) 
{ 
    return getFromCache(<key>).Concat(getFromSource(<key>).Take(1); 
} 

這只是一個鬆散的想法,你需要添加:

  • 一種機制來將項目添加到緩存中,或承擔getFromSource緩存結果
  • 某種類型的線程安全的,以防止源上的多個命中爲同一個未緩存鍵(如果需要)
  • 如果項目不在緩存中,getFromCache將需要返回Observable.Empty()。

但是,如果你想要簡單的東西,這是一個不錯的地方開始。