我有完全相同的問題。我用以下簽名的擴展方法解決了這個問題:
IObservable<R> FromCacheOrFetch<T, R>(
this IObservable<T> source,
Func<T, R> cache,
Func<IObservable<T>, IObservable<R>> fetch,
IScheduler scheduler) where R : class
有效這是什麼做的是採取在源可觀察並返回一個可觀察的,將匹配其產值每個輸入值。
要獲得每個輸出值,它將首先檢查緩存。如果該值存在於它使用的緩存中。如果不是這樣,它將在不在緩存中的值上旋轉fetch
函數只有。如果所有的值都在緩存中,那麼fetch
函數將永遠不會被啓動 - 所以沒有服務連接設置懲罰等。
我給你的代碼,但它是基於一個稍微不同的版本該擴展方法使用了一個Maybe<T>
單子 - 所以你可能會發現你需要擺弄實現。
這就是:
public static IObservable<R> FromCacheOrFetch<T, R>(this IObservable<T> source, Func<T, R> cache, Func<IObservable<T>, IObservable<R>> fetch, IScheduler scheduler)
where R : class
{
return source.FromCacheOrFetch<T, R>(t => cache(t).ToMaybe(null), fetch, scheduler);
}
public static IObservable<R> FromCacheOrFetch<T, R>(this IObservable<T> source, Func<T, Maybe<R>> cache, Func<IObservable<T>, IObservable<R>> fetch, IScheduler scheduler)
{
var results = new Subject<R>();
var disposables = new CompositeDisposable();
var loop = new EventLoopScheduler();
disposables.Add(loop);
var sourceDone = false;
var pairsDone = true;
var exception = (Exception)null;
var fetchIn = new Subject<T>();
var fetchOut = (IObservable<R>)null;
var pairs = (IObservable<KeyValuePair<int, R>>)null;
var lookup = new Dictionary<T, int>();
var list = new List<Maybe<R>>();
var cursor = 0;
Action checkCleanup =() =>
{
if (sourceDone && pairsDone)
{
if (exception == null)
{
results.OnCompleted();
}
else
{
results.OnError(exception);
}
loop.Schedule(() => disposables.Dispose());
}
};
Action dequeue =() =>
{
while (cursor != list.Count)
{
var mr = list[cursor];
if (mr.HasValue)
{
results.OnNext(mr.Value);
cursor++;
}
else
{
break;
}
}
};
Action<KeyValuePair<int, R>> nextPairs = kvp =>
{
list[kvp.Key] = Maybe<R>.Something(kvp.Value);
dequeue();
};
Action<Exception> errorPairs = ex =>
{
fetchIn.OnCompleted();
pairsDone = true;
exception = ex;
checkCleanup();
};
Action completedPairs =() =>
{
pairsDone = true;
checkCleanup();
};
Action<T> sourceNext = t =>
{
var mr = cache(t);
list.Add(mr);
if (mr.IsNothing)
{
lookup[t] = list.Count - 1;
if (fetchOut == null)
{
pairsDone = false;
fetchOut = fetch(fetchIn.ObserveOn(Scheduler.ThreadPool));
pairs = fetchIn.Select(x => lookup[x]).Zip(fetchOut, (i, r2) => new KeyValuePair<int, R>(i, r2));
disposables.Add(pairs.ObserveOn(loop).Subscribe(nextPairs, errorPairs, completedPairs));
}
fetchIn.OnNext(t);
}
else
{
dequeue();
}
};
Action<Exception> errorSource = ex =>
{
sourceDone = true;
exception = ex;
fetchIn.OnCompleted();
checkCleanup();
};
Action completedSource =() =>
{
sourceDone = true;
fetchIn.OnCompleted();
checkCleanup();
};
disposables.Add(source.ObserveOn(loop).Subscribe(sourceNext, errorSource, completedSource));
return results.ObserveOn(scheduler);
}
使用例子是這樣的:
你必須要獲取指標的來源:
IObservable<X> source = ...
你將有一個函數可以從緩存中獲取值,並且可以將它們放入一個操作(兩者都應該是這樣d安全):
Func<X, Y> getFromCache = x => ...;
Action<X, Y> addToCache = (x, y) => ...;
那麼你將有實際的電話會議,以獲得從您的數據庫或服務的數據:
Func<X, Y> getFromService = x => ...;
則你可以定義fetch
像這樣:
Func<IObservable<X>, IObservable<Y>> fetch =
xs => xs.Select(x =>
{
var y = getFromService(x);
addToCache(x, y);
return y;
});
最後,您可以通過以下方式進行查詢:
IObservable<Y> results =
source.FromCacheOrFetch(
getFromCache,
fetch,
Scheduler.ThreadPool);
當然,您需要訂閱結果才能進行計算。
您確定要同時嘗試兩種方法並選擇首先返回的方法嗎?或者你願意嘗試一下嗎?同時嘗試它們意味着每次都會觸發您的數據服務,無論應用程序是否使用緩存 - 這看起來並不像預期的行爲。 – yamen
@yamen你是完全正確的,這種行爲是無意的,我正在尋找替代這一點。 –
我會在這裏留下這個:https://github.com/github/akavache –