2017-07-14 32 views
2

我正在處理一項服務,該服務負責記錄發送到我們服務的請求。該服務正在脫機工作(正在被解僱和忘記)。 我們正在根據一些輸入參數(產品ID)將請求保存到不同的數據庫。我們不希望每次有人提出請求時都保存到數據庫中 - 我們寧願建立一些「批次」來插入,並且每N時間(比如10秒)執行InsertMany。我已經開始實施,現在我掙扎了大約2件事:C#Concurent dictionary - 鎖定值

  1. 我需要使用ConcurrentDictionary嗎?看起來我會達到相同的正常字典
  2. 如果對上述答案是「不,在你的情況下沒有從ConcurrentDictionary的好處」 - 有沒有辦法重新寫我的代碼「正常」使用ConcurrentDictionary所以我可以避免使用鎖定並確保AddOrUpdate不會與清除批次發生「衝突」?

讓我貼的片段,並進一步解釋:

// dictionary where key is ProductId and value is a list of items to insert to that product database 
    ConcurrentDictionary<string, List<QuoteDetails>> _productDetails; 
    public SaverService(StatelessServiceContext context) 
     : base(context) 
    { 
     _productDetails = new ConcurrentDictionary<string, List<QuoteDetails>>(); 
    } 

    // this function will be fired and forgotten by the external service 
    public async Task SaveRecentRequest(RequestOptions requestData, Response responseData) 
    { 
     await Task.Run(() => { 
      foreach (var token in requestData.ProductAccessTokens) 
      { 
       // this function will extract the specific product request (one request can contain multiple products) 
       var details = SplitQuoteByProduct(requestData, responseData, token); 
       _productDetails.AddOrUpdate(token, new List<QuoteDetails>() { details }, (productId, list) => 
       { 
        list.Add(details); 
        return list; 
       }); 
      } 
     }); 
    } 

    // this function will be executed by a timer every N amount of time 
    public void SaveRequestsToDatabase() 
    { 
     lock (_productDetails) 
     { 
      foreach (var item in _productDetails) 
      { 
       // copy curent items and start a task which will process them 
       SaveProductRequests(item.Key, item.Value.ToList()); 
       // clear curent items 
       item.Value.Clear(); 
      } 
     } 
    } 

    public async Task SaveProductRequests(string productId, List<QuoteDetails> productRequests) 
    { 
     // save received items to database 
     /// ... 
    } 

我主要擔心的是無鎖出現以下情形:

  1. SaveRequestsToDatabase被解僱 - 並開始處理數據
  2. 在撥打item.Value.Clear();SaveRequestsToDatabase函數之前,外部服務會觸發另一個SaveRecentRequest功能,其與相同的密鑰執行AddOrUpdate - 這將增加請求到集合
  3. SaveRequestsToDatabase被整理並因此清除收集 - 但最初目的是通過2中添加不是集合,以便沒有處理在
+0

你真的需要一本字典?在這段代碼中,您可以通過切換到ConcurrentQueue來輕鬆解決所有併發問題,但也許還有其他用法,您不會顯示 –

+0

一種方法是用'ConcurrentBag '替換'List '。 – Evk

回答

3

通常情況下,併發問題通常是由於沒有選擇正確的數據結構。

你的情況,你有兩個工作流程:

  • ñ生產者,排隊事件的同時,不斷
  • 1層的消費者,出隊,並在處理事件的特定時間

您的問題即使不需要,你也試圖將事件歸類爲蝙蝠。將事件保持爲併發部分中的簡單流,並僅將它們排序在消費者部分中,因爲您沒有併發性。

ConcurrentQueue<(string token, QuoteDetails details)> _productDetails; 

public SaverService(StatelessServiceContext context) 
    : base(context) 
{ 
    _productDetails = new ConcurrentQueue<(string, QuoteDetails)>(); 
} 

// this function will be fired and forgotten by the external service 
public async Task SaveRecentRequest(RequestOptions requestData, Response responseData) 
{ 
    await Task.Run(() => { 
     foreach (var token in requestData.ProductAccessTokens) 
     { 
      // this function will extract the specific product request (one request can contain multiple products) 
      var details = SplitQuoteByProduct(requestData, responseData, token); 
      _productDetails.Enqueue((token, details)); 
     } 
    }); 
} 

// this function will be executed by a timer every N amount of time 
public void SaveRequestsToDatabase() 
{ 
    var products = new List<(string token, QuoteDetails details)>(); 

    while (_productDetails.TryDequeue(out var item)) 
    { 
     products.Add(item); 
    } 

    foreach (var group in products.GroupBy(i => i.token, i => i.Details)) 
    { 
     SaveProductRequests(group.Key, group); 
    } 
} 

public async Task SaveProductRequests(string productId, IEnumerable<QuoteDetails> productRequests) 
{ 
    // save received items to database 
    /// ... 
} 
+0

只是注意到您正在使用c#7,它可能不適用於OP –

+0

@BRAHIMKamel好點。但切換到'Tuple'或自定義數據結構應該是微不足道的 –

+0

這是真的,但我更多與@mjwills答案,它使用ConcurrentQueue –

2
ConcurrentDictionary<string, List<QuoteDetails>> _productDetails; 

不會是線程安全的,因爲List不是線程安全的。當一個線程將條目添加到列表中時,另一個線程可能正在迭代它。這最終會失敗。

我會建議使用:

​​3210

或:

ConcurrentDictionary<string, BlockingCollection<QuoteDetails>> _productDetails; 

您還可能能夠完全消除ConcurrentDictionary

1

無論何時添加/刪除/讀取字典,都需要鎖定字典。即使您正忙於處理項目,您當前的代碼也會允許SaveRecentRequest將項目添加到字典中。我建議以下辦法

// dictionary where key is ProductId and value is a list of items to insert to that product database 
Dictionary<string, List<QuoteDetails>> _productDetails; 
public SaverService(StatelessServiceContext context) 
    : base(context) 
{ 
    _productDetails = new Dictionary<string, List<QuoteDetails>>(); 
} 

// this function will be fired and forgotten by the external service 
public async Task SaveRecentRequest(RequestOptions requestData, Response responseData) 
{ 
    await Task.Run(() => { 
     foreach (var token in requestData.ProductAccessTokens) 
     { 
      // this function will extract the specific product request (one request can contain multiple products) 
      var details = SplitQuoteByProduct(requestData, responseData, token); 
      lock(_padlock) 
      { 
       _productDetails.AddOrUpdate(token, new List<QuoteDetails>() { details }, (productId, list) => 
       { 
        list.Add(details); 
        return list; 
       }); 
      } 
     } 
    }); 
} 

// this function will be executed by a timer every N amount of time 
public void SaveRequestsToDatabase() 
{ 
    Dictionary<string, List<QuoteDetails>> offboardingDictionary; 
    lock (_padlock) 
    { 
     offboardingDictionary = _productDetails; 
     _productDetails = new Dictionary<string, List<QuoteDetails>>(); 
    } 

    foreach (var item in offboardingDictionary) 
    { 

     // copy curent items and start a task which will process them 
     SaveProductRequests(item.Key, item.Value.ToList()); 
     // clear curent items 
     item.Value.Clear(); 
    } 
} 

public async Task SaveProductRequests(string productId, List<QuoteDetails> productRequests) 
{ 
    // save received items to database 
    /// ... 
} 

private readonly object _padlock = new object(); 

有了這個,你鎖,當你將項目添加到字典中。爲了提高保存性能,我們爲我們的字典添加了一個新的參考,然後用一個新實例替換原來的一個。通過這種方式,我們可以最大限度地縮短鎖定時間,使新進入的項目可以保存在新字典中,而我們的保存線程將項目從前一個字典中卸載到數據庫。

我不認爲你需要一個併發字典這個任務,一個普通的字典都行,只要你鎖定accessess

+0

我的壞...應鎖定在一個單獨的對象上 – DeadlyEmbrace

+0

https://msdn.microsoft.com/en-us/library/f2090ex9(v=vs.110).aspx也可能是有趣的。 – mjwills

+0

感謝您的回答並解釋我可能仍然無法完成這項工作,因爲AddOrUpdate時沒有鎖定。 – MajkeloDev