2014-02-07 90 views
16

我正在尋找一種優雅的方式來緩存我的異步操作的結果。緩存異步操作

我第一次碰到這樣的同步方法:

public String GetStuff(String url) 
{ 
    WebRequest request = WebRequest.Create(url); 
    using (var response = request.GetResponse()) 
    using (var sr = new StreamReader(response.GetResponseStream())) 
     return sr.ReadToEnd(); 
} 

然後我做了異步:

public async Task<String> GetStuffAsync(String url) 
{ 
    WebRequest request = WebRequest.Create(url); 
    using (var response = await request.GetResponseAsync()) 
    using (var sr = new StreamReader(response.GetResponseStream())) 
     return await sr.ReadToEndAsync(); 
} 

於是我決定,我應該緩存的結果,所以我並不需要查詢外面那往往:

ConcurrentDictionary<String, String> _cache = new ConcurrentDictionary<String, String>(); 

public async Task<String> GetStuffAsync(String url) 
{ 
    return _cache.GetOrAdd(url, await GetStuffInternalAsync(url)); 
} 

private async Task<String> GetStuffInternalAsync(String url) 
{ 
    WebRequest request = WebRequest.Create(url); 
    using (var response = await request.GetResponseAsync()) 
    using (var sr = new StreamReader(response.GetResponseStream())) 
     return await sr.ReadToEndAsync(); 
} 

然後,我讀了一篇文章(o看了一個視頻)關於如何cachi NG Task<T>是更好的,因爲他們創造昂貴:

ConcurrentDictionary<String, Task<String>> _cache = new ConcurrentDictionary<String, Task<String>>(); 

public Task<String> GetStuffAsync(String url) 
{ 
    return _cache.GetOrAdd(url, GetStuffInternalAsync(url)); 
} 

private async Task<String> GetStuffInternalAsync(String url) 
{ 
    WebRequest request = WebRequest.Create(url); 
    using (var response = await request.GetResponseAsync()) 
    using (var sr = new StreamReader(response.GetResponseStream())) 
     return await sr.ReadToEndAsync(); 
} 

而現在的問題是,如果請求失敗(例如:一個HTTP 401),高速緩存將包含一個失敗Task<String>,我將不得不重新設置該應用程序,因爲它將不可能重新發送請求。

是否有一種優雅的方式使用ConcurrentDictionary<T1,T2>緩存只成功的任務,仍然有原子行爲?

+3

操作系統已經爲您緩存了HTTP請求,並且會正確地遵循HTTP語義(即不緩存401)。 –

+0

https://stackoverflow.com/questions/31831860/async-threadsafe-get-from-memorycache –

回答

16

首先,您的方法都是錯誤的,因爲它們不會節省您的任何請求(儘管第二個方法至少可以節省您的時間)。

你的第一個代碼(帶有await)做到這一點:

  1. 使該請求。
  2. 等待請求完成。
  3. 如果緩存中已經有結果,則忽略請求的結果。

您的第二個代碼刪除了第2步,因此速度更快,但您仍然有大量不必要的請求。

你應該做的,而不是爲使用the overload of GetOrAdd() that takes a delegate

public Task<String> GetStuffAsync(String url) 
{ 
    return _cache.GetOrAdd(url, GetStuffInternalAsync); 
} 

這並不能完全消除被忽略請求的可能性,但它確實使他們不太可能。 (對於這一點,你可以嘗試取消,你知道被忽視的請求,但我不認爲這是值得的努力在這裏。)


現在您的實際問題。我認爲你應該做的是使用the AddOrUpdate() method。如果該值尚未存在,則添加它。如果它的存在,如果它出現故障更換:

public Task<String> GetStuffAsync(String url) 
{ 
    return _cache.AddOrUpdate(
     url, GetStuffInternalAsync, (u, task) => 
     { 
      if (task.IsCanceled || task.IsFaulted) 
       return GetStuffInternalAsync(u); 
      return task; 
     }); 
} 
+0

我花了5分鐘來理解你的第一段,直到我看到我的錯誤。在原始代碼中,我有lambda表達式。 AddOrUpdate可能是我需要的東西,我之前檢查過它,但是在閱讀這個小文檔後我並沒有感到非常自信。感謝這個例子。 – vtortola

+2

請注意,如果多個線程在同一時間附近執行'GetStuffAsync','GetStuffInternalAsync'可能會執行多次。 ConcurrentDictionary是線程安全的,但在調用回調委託時不會同步。 –

+0

@ChrisEldredge是的,我提到:「這並不能完全消除被忽略的請求的可能性,但它確實使它們不太可能。」 – svick

7

它實際上是合理的(並根據您的設計和性能,關鍵),以保持這些失敗的任務作爲Negative Cache。否則,如果一個url總是失敗,一次又一次地使用它會破壞整個使用緩存的點。

你需要的是一種不時清除緩存的方法。最簡單的方法是用一個定時器替代ConcurrentDictionarry實例。更強大的解決方案是建立自己的LruDictionary或類似的東西。

+3

實際上,我認爲最簡單的方法是使用'MemoryCache',一段時間後它已經支持從緩存清除值。 – svick

+0

@svick雖然我不確定它支持那些原子操作。 – i3arnon

+0

它有['AddOrGetExisting()'](http://msdn.microsoft.com/en-us/library/system.runtime.caching.memorycache.addorgetexisting),我認爲這就夠了。 – svick

0

對我來說這項工作:

ObjectCache _cache = MemoryCache.Default; 
static object _lockObject = new object(); 
public Task<T> GetAsync<T>(string cacheKey, Func<Task<T>> func, TimeSpan? cacheExpiration = null) where T : class 
{ 
    var task = (T)_cache[cacheKey]; 
    if (task != null) return task;   
    lock (_lockObject) 
    { 
     task = (T)_cache[cacheKey](cacheKey); 
     if (task != null) return task; 
     task = func(); 
     Set(cacheKey, task, cacheExpiration); 
     task.ContinueWith(t => { 
      if (t.Status != TaskStatus.RanToCompletion) 
       _cache.Remove(cacheKey); 
     }); 
    } 
    return task; 
} 
1

這裏有一個辦法,保證沒有高速緩存未命中的異步操作的緩存結果。

正如在接受的答案的評論中所提到的,如果多次在循環中(取決於SynchronizationContext)或從多個線程請求相同的url,那麼web請求會一直髮送出去,直到有一個緩存的響應,此時緩存將開始使用。

以下方法爲每個唯一密鑰創建一個SemaphoreSlim對象。這將防止長時間運行的異步操作對同一個密鑰運行多次,同時允許它同時針對不同的密鑰運行。顯然,爲了防止緩存未命中,保留SemaphoreSlim對象的開銷很大,所以根據用例它可能不值得。但是,如果保證沒有緩存丟失是重要的,否則這會實現這一點。

private readonly ConcurrentDictionary<string, SemaphoreSlim> _keyLocks = new ConcurrentDictionary<string, SemaphoreSlim>(); 
private readonly ConcurrentDictionary<string, string> _cache = new ConcurrentDictionary<string, string>(); 

public async Task<string> GetSomethingAsync(string key) 
{ 
    string value; 
    // get the semaphore specific to this key 
    var keyLock = _keyLocks.GetOrAdd(key, x => new SemaphoreSlim(1)); 
    await keyLock.WaitAsync(); 
    try 
    { 
     // try to get value from cache 
     if (!_cache.TryGetValue(key, out value)) 
     { 
      // if value isn't cached, get it the long way asynchronously 
      value = await GetSomethingTheLongWayAsync(); 

      // cache value 
      _cache.TryAdd(key, value); 
     } 
    } 
    finally 
    { 
     keyLock.Release(); 
    } 
    return value; 
} 
+1

您應該使用接受委託的'GetOrAdd'重載,以免不斷創建不需要的信號量。 – Servy

+0

@Servy好點。更新了示例 – Brandon

0

另一種簡單的方法來做到這一點是延長Lazy<T>AsyncLazy<T>,就像這樣:

public class AsyncLazy<T> : Lazy<Task<T>> 
{ 
    public AsyncLazy(Func<Task<T>> taskFactory, LazyThreadSafetyMode mode) : 
     base(() => Task.Factory.StartNew(() => taskFactory()).Unwrap(), mode) 
    { } 

    public TaskAwaiter<T> GetAwaiter() { return Value.GetAwaiter(); } 
} 

然後,你可以這樣做:

private readonly ConcurrentDictionary<string, AsyncLazy<string>> _cache 
    = new ConcurrentDictionary<string, AsyncLazy<string>>(); 

public async Task<string> GetStuffAsync(string url) 
{ 
    return await _cache.GetOrAdd(url, 
     new AsyncLazy<string>(
      () => GetStuffInternalAsync(url), 
      LazyThreadSafetyMode.ExecutionAndPublication)); 
} 
+0

我希望得到一些反饋意見。 – Enigmativity

1

我已經爲一個包裝MemoryCache基本上緩存了Lazy<Task<T>>對象,並且可以解決以下所有問題:

  • 沒有並行或不必要的操作來獲取值將開始。多個調用站點或線程可以等待緩存中的相同值。
  • 失敗的任務沒有被緩存。 (無負緩存)。
  • 緩存用戶無法從緩存中獲取無效結果,即使該值在等待期間失效。

該解決方案在my blog中有進一步說明,完整的工作代碼可在GitHub處獲得。