The bottom of this article描述瞭如何使用GetOrAdd可能會導致(如果我正確理解)損壞/意外的結果。使用「ConcurrentDictionary.GetOrAdd()」時包含Repro代碼避免陳舊(邏輯損壞)數據
剪斷/
ConcurrentDictionary被設計用於多線程 場景。您不必在代碼中使用鎖來從集合中添加或刪除 項。但是,一個 線程總是可以檢索一個值,另一個線程通過爲同一個鍵提供一個新值來立即更新集合 。
另外,儘管ConcurrentDictionary的所有方法都是線程安全的,但並不是所有的方法都是原子的,具體是GetOrAdd和 AddOrUpdate。傳遞給這些方法的用戶委託在字典的內部鎖之外調用 。 (這樣做是爲了 防止未知代碼阻止所有線程。)因此,這是 可能發生的這一系列事件:
1)ThreadA中調用GetOrAdd,沒有發現項目,並創建一個新的項目通過添加 調用valueFactory委託。
2)threadB同時調用GetOrAdd,其valueFactory委託是 調用,並且它到達的ThreadA前的內鎖,並且因此它的 新鍵 - 值對被添加到詞典中。
3)ThreadA中的用戶委託完成,並且線程到達 鎖,但現在看到,該項目已經存在
4)的ThreadA執行「獲取」,並返回以前 添加的數據由threadB。
因此,不能保證由 GetOrAdd返回的數據與由線程的 valueFactory創建的數據相同。 AddOrUpdate 被調用時,可能會發生類似的事件序列。
問題
什麼是驗證數據,再次更新了正確的方法是什麼?一個不錯的方法是根據舊值的內容嘗試/重試此操作的擴展方法。
這將如何實施?我可以依賴結果(verify
)作爲有效的最終狀態,還是必須使用其他方法重試並重新檢索值?更新值時
代碼
下面的代碼有一個爭用條件。期望的行爲是AddOrUpdateWithoutRetrieving()將以不同的方式增加各種值(使用++
或Interlocked.Increment()
)。
我還希望在單個單元中執行多個字段操作,並在以前的更新由於競爭條件而沒有「採取」時重試更新。
運行代碼,您會看到控制檯中出現的每個值都會增加1,但每個值都會漂移,有些會在前後進行一些迭代。
namespace DictionaryHowTo
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
// The type of the Value to store in the dictionary:
class FilterConcurrentDuplicate
{
// Create a new concurrent dictionary.
readonly ConcurrentDictionary<int, TestData> eventLogCache =
new ConcurrentDictionary<int, TestData>();
static void Main()
{
FilterConcurrentDuplicate c = new FilterConcurrentDuplicate();
c.DoRace(null);
}
readonly ConcurrentDictionary<int, TestData> concurrentCache =
new ConcurrentDictionary<int, TestData>();
void DoRace(string[] args)
{
int max = 1000;
// Add some key/value pairs from multiple threads.
Task[] tasks = new Task[3];
tasks[0] = Task.Factory.StartNew(() =>
{
System.Random RandNum = new System.Random();
int MyRandomNumber = RandNum.Next(1, 500);
Thread.Sleep(MyRandomNumber);
AddOrUpdateWithoutRetrieving();
});
tasks[1] = Task.Factory.StartNew(() =>
{
System.Random RandNum = new System.Random();
int MyRandomNumber = RandNum.Next(1, 1000);
Thread.Sleep(MyRandomNumber);
AddOrUpdateWithoutRetrieving();
});
tasks[2] = Task.Factory.StartNew(() =>
{
AddOrUpdateWithoutRetrieving();
});
// Output results so far.
Task.WaitAll(tasks);
AddOrUpdateWithoutRetrieving();
Console.WriteLine("Press any key.");
Console.ReadKey();
}
public class TestData : IEqualityComparer<TestData>
{
public string aStr1 { get; set; }
public Guid? aGud1 { get; set; }
public string aStr2 { get; set; }
public int aInt1 { get; set; }
public long? aLong1 { get; set; }
public DateTime aDate1 { get; set; }
public DateTime? aDate2 { get; set; }
//public int QueryCount { get; set; }
public int QueryCount = 0;//
public string zData { get; set; }
public bool Equals(TestData x, TestData y)
{
return x.aStr1 == y.aStr1 &&
x.aStr2 == y.aStr2 &&
x.aGud1 == y.aGud1 &&
x.aStr2 == y.aStr2 &&
x.aInt1 == y.aInt1 &&
x.aLong1 == y.aLong1 &&
x.aDate1 == y.aDate1 &&
x.QueryCount == y.QueryCount ;
}
public int GetHashCode(TestData obj)
{
TestData ci = (TestData)obj;
// http://stackoverflow.com/a/263416/328397
return
new {
A = ci.aStr1,
Aa = ci.aStr2,
B = ci.aGud1,
C = ci.aStr2,
D = ci.aInt1,
E = ci.aLong1,
F = ci.QueryCount ,
G = ci.aDate1}.GetHashCode();
}
}
private void AddOrUpdateWithoutRetrieving()
{
// Sometime later. We receive new data from some source.
TestData ci = new TestData()
{
aStr1 = "Austin",
aGud1 = new Guid(),
aStr2 = "System",
aLong1 = 100,
aInt1 = 1000,
QueryCount = 0,
aDate1 = DateTime.MinValue
};
TestData verify = concurrentCache.AddOrUpdate(123, ci,
(key, existingVal) =>
{
existingVal.aStr2 = "test1" + existingVal.QueryCount;
existingVal.aDate1 = DateTime.MinValue;
Console.WriteLine
("Thread:" + Thread.CurrentThread.ManagedThreadId +
" Query Count A:" + existingVal.QueryCount);
Interlocked.Increment(ref existingVal.QueryCount);
System.Random RandNum = new System.Random();
int MyRandomNumber = RandNum.Next(1, 1000);
Thread.Sleep(MyRandomNumber);
existingVal.aInt1++;
existingVal.aDate1 =
existingVal.aDate1.AddSeconds
(existingVal.aInt1);
Console.WriteLine(
"Thread:" + Thread.CurrentThread.ManagedThreadId +
" Query Count B:" + existingVal.QueryCount);
return existingVal;
});
// After each run, every value here should be ++ the previous value
Console.WriteLine(
"Thread:"+Thread.CurrentThread.ManagedThreadId +
": Query Count returned:" + verify.QueryCount +
" eid:" + verify.aInt1 + " date:" +
verify.aDate1.Hour + " " + verify.aDate1.Second +
" NAME:" + verify.aStr2
);
}
}
}
輸出
Thread:12: Query Count returned:0 eid:1000 date:0 0 NAME:System
Thread:12 Query Count A:0
Thread:13 Query Count A:1
Thread:12 Query Count B:2
Thread:12: Query Count returned:2 eid:1001 date:0 41 NAME:test11
Thread:12 Query Count A:2
Thread:13 Query Count B:3
Thread:13: Query Count returned:3 eid:1002 date:0 42 NAME:test12
Thread:13 Query Count A:3
Thread:11 Query Count A:4
Thread:11 Query Count B:5
Thread:11: Query Count returned:5 eid:1003 date:0 43 NAME:test14
Thread:11 Query Count A:5
Thread:13 Query Count B:6
Thread:13: Query Count returned:6 eid:1004 date:0 44 NAME:test15
....
Thread:11 Query Count A:658
Thread:11 Query Count B:659
Thread:11: Query Count returned:659 eid:1656 date:0 36 NAME:test1658
Thread:11 Query Count A:659
Thread:11 Query Count B:660
Thread:11: Query Count returned:660 eid:1657 date:0 37 NAME:test1659
Thread:11 Query Count A:660
Thread:11 Query Count B:661
Thread:11: Query Count returned:661 eid:1658 date:0 38 NAME:test1660
Thread:11 Query Count A:661
Thread:11 Query Count B:662
Thread:11: Query Count returned:662 eid:1659 date:0 39 NAME:test1661
在此代碼 「EID」 應該永遠是1000多查詢的次數,但在迭代的不同而變化兩者之間從1到7。這種不一致可能會導致某些應用程序失敗或報告不正確的數據。
''ConcurrentDictionary'對於它自己的不變式只是線程安全的。即它不會破壞它自己的數據。如果你有其他的不變量,它不可能事先知道它們,或者有望補償它們。您需要一致地定義您的不變量,並且保護作爲具有某種線程同步的事務。 –