2014-10-05 47 views
8

天青TableQuery線程安全的我有我已經連續查詢一些基本Azure的表:與Parallel.ForEach

var query = new TableQuery<DynamicTableEntity>() 
    .Where(TableQuery.GenerateFilterCondition("PartitionKey", 
    QueryComparisons.Equal, myPartitionKey)); 

foreach (DynamicTableEntity entity in myTable.ExecuteQuery(query)) { 
    // Process entity here. 
} 

加快這,我這個並行像這樣:

Parallel.ForEach(myTable.ExecuteQuery(query), (entity, loopState) => { 
    // Process entity here in a thread-safe manner. 

    // Edited to add: Details of the loop body below: 

    // This is the essence of the fixed loop body: 
    lock (myLock) { 
    DataRow myRow = myDataTable.NewRow(); 
    // [Add entity data to myRow.] 
    myDataTable.Rows.Add(myRow); 
    } 

    // Old code (apparently not thread-safe, though NewRow() is supposed to create 
    // a DataRow based on the table's schema without changing the table state): 
    /* 
    DataRow myRow = myDataTable.NewRow(); 
    lock (myLock) { 
     // [Add entity data to myRow.] 
     myDataTable.Rows.Add(myRow); 
    } 
    */ 
}); 

這會產生顯着的加速,但結果往往會略有不同(即,某些實體偶爾會有所不同,儘管返回的實體數量完全相同)。

從這個和一些網絡搜索,我得出結論,上面的枚舉並不總是線程安全的。該文檔似乎表明,只有在表格對象是公共靜態的情況下才能保證線程安全,但這對我沒有任何影響。

有人可以建議如何解決這個問題嗎?是否有平行Azure表查詢的標準模式?

+1

枚舉器不必是線程安全的,「Parallel.ForEach()」可以處理該問題。如果實體共享某個狀態,則可能會出現問題。 – svick 2014-10-05 13:37:03

+1

你能澄清一下稍微不同的結果嗎?如果您在Parallel.ForEach中記錄所有實體,您是否按不同的順序獲得同一組實體? – 2014-10-05 16:47:42

+0

我已經對實體進行了排序以確定確切的差異,並且這些集合幾乎完全相同。然而,偶爾會有一個特定實體丟失,而另一個實體被重複(與我連續得到的結果相比,這些結果總是相同的,並且可能包含表格內容的基本事實)。這似乎是一般模式 - 一些實體丟失了,但其他實體卻被複制以保持實體數量相同。就好像某些索引沒有以線程安全的方式遞增,從內存中讀取實體時導致爭用條件。 – 2014-10-06 00:22:06

回答

4

您的評論是正確的:DataTable不適合涉及突變的併發操作,並且是重複條目的來源。鎖定爲行修改操作DataTable對象將解決此問題:

lock (myTable) 
{ 
    DataRow myRow = myTable.NewRow(); 
    myRow.SetField<int>("c1", (int)value); 
    myTable.Rows.Add(myRow); 
} 

把NEWROW()鎖外將間歇性導致在表中重複的行條目或「類型‘System.ArgumentException’未處理的異常的發生在NewRow()行中的「System.Data.dll」異常中。有關併發DataTable用法的更多詳細信息和備選方法,請參見Thread safety for DataTable

要重現錯誤情況,請使用此代碼。一些運行將是乾淨的,一些會包含重複的條目,而有些則會遇到異常。

class Program 
    { 
     static DataTable myTable = GetTable(); 
     static ManualResetEvent waitHandle = new ManualResetEvent(false); 

     static void Main(string[] args) 
     { 
     const int threadCount = 10; 
     List<Thread> threads = new List<System.Threading.Thread>(); 
     for (int i = 0; i < threadCount; ++i) 
     { 
      threads.Add(new Thread(new ParameterizedThreadStart(AddRowThread))); 
      threads[i].Start(i); 
     } 
     waitHandle.Set(); // Release all the threads at once 
     for (int i = 0; i < threadCount; ++i) 
     { 
      threads[i].Join(); 
     } 

     // Print results once threads return 
     for (int i = 0; i < myTable.Rows.Count; ++i) 
     { 
      Console.WriteLine(myTable.Rows[i].Field<int>(0)); 
     } 
     Console.WriteLine("---Processing Complete---"); 
     Console.ReadKey(); 
     } 

     static void AddRowThread(object value) 
     { 
     waitHandle.WaitOne(); 
     DataRow myRow = myTable.NewRow(); // THIS RESULTS IN INTERMITTENT ERRORS 
     lock (myTable) 
     { 
      //DataRow myRow = myTable.NewRow(); // MOVE NewRow() CALL HERE TO RESOLVE ISSUE 
      myRow.SetField<int>("c1", (int)value); 
      myTable.Rows.Add(myRow); 
     } 
     } 

     static DataTable GetTable() 
     { 
     // Here we create a DataTable with four columns. 
     DataTable table = new DataTable(); 
     table.Columns.Add("c1", typeof(int));  
     return table; 
     } 
    }