5

The bottom of this article描述瞭如何使用GetOrAdd可能會導致(如果我正確理解)損壞/意外的結果。使用「ConcurrentDictionary.GetOrAdd()」時包含Repro代碼避免陳舊(邏輯損壞)數據

剪斷/

ConcurrentDictionary被設計用於多線程 場景。您不必在代碼中使用鎖來從集合中添加或刪除 項。但是,一個 線程總是可以檢索一個值,另一個線程通過爲同一個鍵提供一個新值來立即更新集合 。

另外,儘管ConcurrentDictionary的所有方法都是線程安全的,但並不是所有的方法都是原子的,具體是GetOrAdd和 AddOrUpdate。傳遞給這些方法的用戶委託在字典的內部鎖之外調用 。 (這樣做是爲了 防止未知代碼阻止所有線程。)因此,這是 可能發生的這一系列事件:

1)ThreadA中調用GetOrAdd,沒有發現項目,並創建一個新的項目通過添加 調用valueFactory委託。

2)threadB同時調用GetOrAdd,其valueFactory委託是 調用,並且它到達的ThreadA前的內鎖,並且因此它的 新鍵 - 值對被添加到詞典中。

3)ThreadA中的用戶委託完成,並且線程到達 鎖,但現在看到,該項目已經存在

4)的ThreadA執行「獲取」,並返回以前 添加的數據由threadB。

因此,不能保證由 GetOrAdd返回的數據與由線程的 valueFactory創建的數據相同。 AddOrUpdate 被調用時,可能會發生類似的事件序列。

問題

什麼是驗證數據,再次更新了正確的方法是什麼?一個不錯的方法是根據舊值的內容嘗試/重試此操作的擴展方法。

這將如何實施?我可以依賴結果(verify)作爲有效的最終狀態,還是必須使用其他方法重試並重新檢索值?更新值時

代碼

下面的代碼有一個爭用條件。期望的行爲是AddOrUpdateWithoutRetrieving()將以不同的方式增加各種值(使用++Interlocked.Increment())。

我還希望在單個單元中執行多個字段操作,並在以前的更新由於競爭條件而沒有「採取」時重試更新。

運行代碼,您會看到控制檯中出現的每個值都會增加1,但每個值都會漂移,有些會在前後進行一些迭代。

namespace DictionaryHowTo 
{ 
    using System; 
    using System.Collections.Concurrent; 
    using System.Collections.Generic; 
    using System.Linq; 
    using System.Text; 
    using System.Threading; 
    using System.Threading.Tasks; 

    // The type of the Value to store in the dictionary: 
    class FilterConcurrentDuplicate 
    { 
     // Create a new concurrent dictionary. 
     readonly ConcurrentDictionary<int, TestData> eventLogCache = 
      new ConcurrentDictionary<int, TestData>(); 

     static void Main() 
     { 
      FilterConcurrentDuplicate c = new FilterConcurrentDuplicate(); 

      c.DoRace(null); 
     } 

     readonly ConcurrentDictionary<int, TestData> concurrentCache = 
      new ConcurrentDictionary<int, TestData>(); 
     void DoRace(string[] args) 
     { 
      int max = 1000; 

      // Add some key/value pairs from multiple threads. 
      Task[] tasks = new Task[3]; 

      tasks[0] = Task.Factory.StartNew(() => 
      { 

       System.Random RandNum = new System.Random(); 
       int MyRandomNumber = RandNum.Next(1, 500); 

       Thread.Sleep(MyRandomNumber); 
       AddOrUpdateWithoutRetrieving(); 

      }); 

      tasks[1] = Task.Factory.StartNew(() => 
      { 
       System.Random RandNum = new System.Random(); 
       int MyRandomNumber = RandNum.Next(1, 1000); 

       Thread.Sleep(MyRandomNumber); 

       AddOrUpdateWithoutRetrieving(); 

      }); 

      tasks[2] = Task.Factory.StartNew(() => 
      { 
       AddOrUpdateWithoutRetrieving(); 

      }); 
      // Output results so far. 
      Task.WaitAll(tasks); 

      AddOrUpdateWithoutRetrieving(); 

      Console.WriteLine("Press any key."); 
      Console.ReadKey(); 
     } 
     public class TestData : IEqualityComparer<TestData> 
     { 
      public string aStr1 { get; set; } 
      public Guid? aGud1 { get; set; } 
      public string aStr2 { get; set; } 
      public int aInt1 { get; set; } 
      public long? aLong1 { get; set; } 

      public DateTime aDate1 { get; set; } 
      public DateTime? aDate2 { get; set; } 

      //public int QueryCount { get; set; } 
      public int QueryCount = 0;// 

      public string zData { get; set; } 
      public bool Equals(TestData x, TestData y) 
      { 
       return x.aStr1 == y.aStr1 && 
        x.aStr2 == y.aStr2 && 
         x.aGud1 == y.aGud1 && 
         x.aStr2 == y.aStr2 && 
         x.aInt1 == y.aInt1 && 
         x.aLong1 == y.aLong1 && 
         x.aDate1 == y.aDate1 && 
         x.QueryCount == y.QueryCount ; 
      } 

      public int GetHashCode(TestData obj) 
      { 
       TestData ci = (TestData)obj; 
       // http://stackoverflow.com/a/263416/328397 
       return 
        new { 
         A = ci.aStr1, 
         Aa = ci.aStr2, 
         B = ci.aGud1, 
         C = ci.aStr2, 
         D = ci.aInt1, 
         E = ci.aLong1, 
         F = ci.QueryCount , 
         G = ci.aDate1}.GetHashCode(); 
      } 
     } 
     private void AddOrUpdateWithoutRetrieving() 
     { 
      // Sometime later. We receive new data from some source. 
      TestData ci = new TestData() 
      { 
       aStr1 = "Austin", 
       aGud1 = new Guid(), 
       aStr2 = "System", 
       aLong1 = 100, 
       aInt1 = 1000, 
       QueryCount = 0, 
       aDate1 = DateTime.MinValue 
      }; 

      TestData verify = concurrentCache.AddOrUpdate(123, ci, 
       (key, existingVal) => 
       { 
        existingVal.aStr2 = "test1" + existingVal.QueryCount; 
        existingVal.aDate1 = DateTime.MinValue; 
        Console.WriteLine 
        ("Thread:" + Thread.CurrentThread.ManagedThreadId + 
          " Query Count A:" + existingVal.QueryCount); 
        Interlocked.Increment(ref existingVal.QueryCount); 
        System.Random RandNum = new System.Random(); 
        int MyRandomNumber = RandNum.Next(1, 1000); 

        Thread.Sleep(MyRandomNumber); 
        existingVal.aInt1++; 
        existingVal.aDate1 = 
         existingVal.aDate1.AddSeconds 
         (existingVal.aInt1); 
        Console.WriteLine(
          "Thread:" + Thread.CurrentThread.ManagedThreadId + 
          " Query Count B:" + existingVal.QueryCount); 
        return existingVal; 
       }); 


      // After each run, every value here should be ++ the previous value 
      Console.WriteLine(
       "Thread:"+Thread.CurrentThread.ManagedThreadId + 
       ": Query Count returned:" + verify.QueryCount + 
       " eid:" + verify.aInt1 + " date:" + 
       verify.aDate1.Hour + " " + verify.aDate1.Second + 
       " NAME:" + verify.aStr2 
       ); 
     } 

    } 
} 

輸出

Thread:12: Query Count returned:0 eid:1000 date:0 0 NAME:System 

Thread:12 Query Count A:0 
Thread:13 Query Count A:1 
Thread:12 Query Count B:2 
Thread:12: Query Count returned:2 eid:1001 date:0 41 NAME:test11 

Thread:12 Query Count A:2 
Thread:13 Query Count B:3 
Thread:13: Query Count returned:3 eid:1002 date:0 42 NAME:test12 

Thread:13 Query Count A:3 
Thread:11 Query Count A:4 
Thread:11 Query Count B:5 
Thread:11: Query Count returned:5 eid:1003 date:0 43 NAME:test14 

Thread:11 Query Count A:5 
Thread:13 Query Count B:6 
Thread:13: Query Count returned:6 eid:1004 date:0 44 NAME:test15 

....

Thread:11 Query Count A:658 
Thread:11 Query Count B:659 
Thread:11: Query Count returned:659 eid:1656 date:0 36 NAME:test1658 

Thread:11 Query Count A:659 
Thread:11 Query Count B:660 
Thread:11: Query Count returned:660 eid:1657 date:0 37 NAME:test1659 

Thread:11 Query Count A:660 
Thread:11 Query Count B:661 
Thread:11: Query Count returned:661 eid:1658 date:0 38 NAME:test1660 

Thread:11 Query Count A:661 
Thread:11 Query Count B:662 
Thread:11: Query Count returned:662 eid:1659 date:0 39 NAME:test1661 

在此代碼 「EID」 應該永遠是1000多查詢的次數,但在迭代的不同而變化兩者之間從1到7。這種不一致可能會導致某些應用程序失敗或報告不正確的數據。

+0

''ConcurrentDictionary'對於它自己的不變式只是線程安全的。即它不會破壞它自己的數據。如果你有其他的不變量,它不可能事先知道它們,或者有望補償它們。您需要一致地定義您的不變量,並且保護作爲具有某種線程同步的事務。 –

回答

4

這份書面材料是根據在文章底部的言論不正確的認識:http://msdn.microsoft.com/en-us/library/dd997369.aspx和基本的併發性錯誤「如何從ConcurrentDictionary添加和刪除項目」 - 的併發非原子修改共享對象。

首先,讓我們來澄清鏈接文章的真實含義。我將以AddOrUpdate爲例,但GetOrAdd的推理是等價的。

說,你從幾個線程調用AddOrUpdate並指定相同的鍵。假定具有該密鑰的條目已經存在。 每個線程都會出現,請注意,已經存在具有指定鍵的條目,並且AddOrUpdate的更新部分是相關的。 這樣做,沒有線程會鎖定字典。相反,它會使用一些互鎖指令來自動檢查入口鍵是否存在。

因此,我們的幾個線程都注意到鍵存在,並且需要調用updateValueFactory。該委託被傳遞給AddOrUpdate;它會引用現有的鍵和值並返回更新值。現在,所涉及的所有線程將同時調用工廠。它們都將以先前未知的順序完成,並且每個線程都會嘗試使用原子操作(使用互鎖指令)以用剛剛計算的值替換現有值。沒有辦法知道哪個線程會「贏」。獲勝的線程將獲得存儲其計算值。其他人會注意到字典中的值不再是作爲參數傳入updateValueFactory的值。爲了迴應這種認識,他們將放棄手術並丟棄剛剛計算的數值。這正是你想要發生的事情。

接下來,讓我們闡明你爲什麼跑這裏列出的代碼示例時得到奇怪的值:

回想一下,傳遞給AddOrUpdate的updateValueFactory代表需要參考現有的鍵和值,並返回更新值。 其AddOrUpdateWithoutRetrieving()方法中的代碼示例開始直接對該引用執行操作。它不是創建一個新的替換值並修改THAT,而是修改existingVal的實例成員值 - 一個已經在字典中的對象 - 然後簡單地返回該引用。而且它不是原子性的 - 它讀取一些值,更新一些值,讀取更多,更新更多。當然,我們在上面已經看到,這在同時發生在多個線程上 - 它們都修改SAME對象。難怪結果是在任何時候(代碼示例調用WriteLine時),對象都包含源自不同線程的成員實例值。

字典與此無關 - 代碼只是簡單地修改線程之間非自動地共享的對象。這是最常見的併發錯誤之一。兩種最常見的解決方法取決於場景。使用共享鎖來使整個對象修改爲原子,或者先原子複製整個對象,然後修改本地副本。

對於後者,你可以添加以下到TESTDATA類:

private Object _copyLock = null; 

private Object GetLock() { 

    if (_copyLock != null) 
     return _copyLock; 

    Object newLock = new Object(); 
    Object prevLock = Interlocked.CompareExchange(ref _copyLock, newLock, null); 
    return (prevLock == null) ? newLock : prevLock; 
} 

public TestData Copy() { 

    lock (GetLock()) { 
     TestData copy = new TestData(); 
     copy.aStr1 = this.aStr1; 
     copy.aStr2 = this.aStr2; 
     copy.aLong1 = this.aLong1; 
     copy.aInt1 = this.aInt1; 
     copy.QueryCount = this.QueryCount; 
     copy.aDate1 = this.aDate1; 
     copy.aDate2 = this.aDate2; 
     copy.zData = this.zData; 

     return copy; 
    } 
} 

然後修改出廠如下:

TestData verify = concurrentCache.AddOrUpdate(123, ci, 
    (key, existingVal) => 
    { 
     TestData newVal = existingVal.Copy(); 
     newVal.aStr2 = "test1" + newVal.QueryCount; 
     newVal.aDate1 = DateTime.MinValue; 
     Console.WriteLine("Thread:" + Thread.CurrentThread.ManagedThreadId + " Query Count A:" + newVal.QueryCount); 
     Interlocked.Increment(ref newVal.QueryCount); 
     System.Random RandNum = new System.Random(); 
     int MyRandomNumber = RandNum.Next(1, 1000); 

     Thread.Sleep(MyRandomNumber); 
     newVal.aInt1++; 
     newVal.aDate1 = newVal.aDate1.AddSeconds(newVal.aInt1); 
     Console.WriteLine("Thread:" + Thread.CurrentThread.ManagedThreadId + " Query Count B:" + newVal.QueryCount); 
     return newVal; 
    }); 

我希望這有助於。

3

或許正確的方法是不在乎返回的值是不是valueFactory創建的值。如果這是不可接受的,你需要使用鎖。

+0

請參閱附件中的代碼...我認爲需要使用C#擴展方法來嘗試...重試更新直到它保持一致。那或者合併自旋鎖。不關心是不是答案IMO – LamonteCristo

+0

只要valueFactory沒有副作用(這很可能是一個好主意),它通常應該不重要。 – erikkallen

2

總是沒有一般的保護。但常見的解決方法是返回Lazy<T>而不是T。這種方式創造不需要的懶惰沒有壞處,因爲永遠不會開始。只有一個Lazy會使它成爲對應於該鍵的最終值。只有一個特定的Lazy實例會被返回。

+0

您是否知道我如何在解決此問題的擴展方法中實現此功能? – LamonteCristo

+0

http://blogs.msdn.com/b/pfxteam/archive/2010/04/23/10001621.aspx – usr

1

您可以使用GetOrAddthis implementation。 請注意,即使在這裏工廠可以被調用,而不會將其結果添加到字典中。但是你會看到發生了什麼。

+0

是的,一個包裝GetOrAdd的擴展函數就是我正在尋找的。我有麻煩創建一個重載,如果發生損壞時會自動重試GetOrAdd。 – LamonteCristo