3

我有一個文件列表,其中每個文件包含一個Foo數據列表。現在,同一段Foo數據(例如Id = 1)可能存在於多個文件中,但最近的一段數據會覆蓋現有數據。如何在Parallel.ForEach期間添加或更新此.NET集合?

我只是將每段數據讀入內存集合中。

if !cache.HasKey(foo.Id) then Add  
else cache[foo.Id].UpdatedOn < foo.UpdatedOn then Update 
else do nothing 

當我在文件正在讀(原因有幾個EM的),我還使用Parallel.ForEach(files, file => { .. });

我不知道我是如何做到這一點。

我正在考慮使用ConcurrentDictionary但我不確定如何做AddOrUpdate其中條款thingy。

有什麼建議嗎?

回答

4

可以使用ConcurrentDictionary,像這樣:

dictionary.AddOrUpdate(foo.Id, foo, (id, existing) => 
    existing.UpdatedOn < foo.UpdatedOn ? foo : existing); 

由於在下面的意見的討論中,我將解釋爲什麼這裏沒有競爭條件。 This MSDN本文討論如何值工廠運行,並提到:

因此,不能保證由GetOrAdd返回的數據是由線程的創建valueFactory相同的數據。

這是有道理的,因爲併發字典的設計者不想讓用戶代碼鎖定字典誰知道多久,使其無用。相反,AddOrUpdate所做的是在兩個嵌套循環中運行。下面是一些僞代碼:

do { 
    while (!TryGetValue(key, out value)) 
     if (TryAdd(key, addValue)) return; 
    newValue = updateValueFactory(key, value); 
} while (TryUpdate(key, newValue, value)); 

TryUpdate獲取用於特定桶鎖,當前值進行比較檢索到的值,只有如果匹配執行更新。如果失敗,則外循環再次發生,TryGetValue返回最新值,再次調用值工廠,依此類推。

因此,如果更新成功,可以確保值工廠始終具有最新值。

+0

存在爭用條件 - 以預防,與'鎖(foo.Id)包住上述代碼' block:「ConcurrentDictionary 是爲多線程場景設計的,你不必在你的代碼中使用鎖來添加或刪除集合中的項目,但是,一個線程總是可以檢索一個值,而另一個線程線程來立即通過賦予相同的關鍵值一個新的值來更新集合。「從MSDN「如何:從ConcurrentDictionary添加和刪除項目」 – Moho

+0

@Moho,什麼競爭條件? – Grzenio

+0

有可能兩個線程具有相同的'foo。Id'鍵值同時執行'AddOrUpdate'方法。 – Moho

0

有趣的行爲在ConcurrentDictionary.AddOrUpdate方法:

class Program 
{ 
    static void Main(string[] args) 
    { 
     var cd = new System.Collections.Concurrent.ConcurrentDictionary<int, int>(); 

     var a = 0; 
     var b = 1; 
     var c = 2; 

     cd[ 1 ] = a; 

     Task.WaitAll(
      Task.Factory.StartNew(() => cd.AddOrUpdate(1, b, (key, existingValue) => 
       { 
        Console.WriteLine("b"); 
        if(existingValue < b) 
        { 
         Console.WriteLine("b update"); 
         System.Threading.Thread.Sleep(2000); 
         return b; 
        } 
        else 
        { 
         Console.WriteLine("b no change"); 
         return existingValue; 
        } 
       })), 

      Task.Factory.StartNew(() => cd.AddOrUpdate(1, c, (key, existingValue) => 
      { 
       Console.WriteLine("c start"); 
       if(existingValue < c) 
       { 
        Console.WriteLine("c update"); 
        return c; 
       } 
       else 
       { 
        Console.WriteLine("c no change"); 
        return existingValue; 
       } 
      }))); 

     Console.WriteLine("Value: {0}", cd[ 1 ]); 

     var input = Console.ReadLine(); 
    } 
} 

結果:

ConcurrentDictionary.AddOrUpdate Test Output

+1

如果您還解釋了結果的含義,我認爲這會有所幫助。 – svick

相關問題