2012-07-06 67 views
2

是否存在將並行和並行線程安全計算組合的模式?並行和線程安全串行模式

需要計算第一步將從並行中受益的結果,第二步是對並行結果的串行處理。

一種選擇是運行並行並將輸出保存到集合,然後對集合進行連續處理,然後運行。由於集合可能非常大,因此存在內存管理問題。

以下是系列版本。基本上我想平行TableQueryGetRowKeys並以線程安全的方式使用該結果。試圖只是平行的並鎖定最終結果,但rowKeys可能會關閉。嘗試聚合,但我想不出如何將集合傳遞給聚合,更不用說在聚合中執行線程安全相交。

IEnumerable<string> finalResults = null; 
if (partitionKey.Length == 0) return finalResults; 
object lockObject = new object(); 
finalResults = TableQueryGetRowKeys(partitionKey[0], 0); 
HashSet<string> rowKeys; 
for(int i = 1; i < partitionKey.Length; i++) 
{ 
    // IO operation to Azure Table Storage against the PartitionKey 
    // so very amenable to parallel 
    rowKeys = TableQueryGetRowKeys(partitionKey[i]); 
    // a memory and CPU operation 
    // this should be much faster than TableQueryGetRowKeys 
    // going parallel and wrapping this in a lock did not properly synch rowKeys 
    finalResults = finalResults.Intersect(rowKeys); 
} 
return finalResults; 
+0

你能更詳細地描述你的問題嗎?如在,你如何處理你的數據? – Wug 2012-07-06 17:00:50

+0

在此代碼中TableQueryGetRowKeys只是一個虛擬例程。在現實生活中,它將是對PartitionKey上的Azure表存儲的查詢並返回RowKeys。 Interset將獲得RowKeys公共集合中的一組PartitionKeys。 – Paparazzi 2012-07-06 19:31:52

回答

2

假設TableQueryGetRowKeys是線程安全:

var final = partitionKey.AsParallel() 
         // By returning AsParallel we can get parallel intersect 
         .Select(k => TableQueryGetRowKeys(k).AsParallel()) 
         .Aggregate((x, y) => x.Intersect(y)); 

// Using fake-ish data I see about a 30% speed-up on a 4-core machine: 
// static HashSet<string> TableQueryGetRowKeys(string prefix) 
// { 
//  // Simulate 1s of IO round-trip 
//  if (useSleep) Thread.Sleep(1000); 
// 
//  return new HashSet<string>(
//   Enumerable.Range(0, 500) 
//     .Select(_ => random.Value.Next(0, 500).ToString())); 
// } 

在逐步的方式,該算法的工作原理是這樣:

  1. partitionKey.AsParallel()接通定期IEnumerable<string>ParallelQuery<string>其允許的並行處理序列。
  2. 接下來,ParallelEnumerable.Select用於並行呼叫TableQueryGetRowKeys
  3. 然後使用AsParallel()將每次致電TableQueryGetRowKeys的結果包裝在ParallelQuery<T>中。
  4. ParallelEnumerable.Intersect用作TableQueryGetRowKeys返回的每個「並行啓用」枚舉的聚合函數。

實際上,這可以在串行使用通過去除AsParallel電話,以取代以前的代碼,像這樣:

var serialEquivalent = partitionKey.Select(k => TableQueryGetRowKeys(k)) 
            .Aggregate((x,y) => x.Intersect(y)); 

您可以「說服」自己,這是相當於你的方法,當你看看你的實施肉和土豆:

IEnumerable<string> results = SomeMethod(0); 
for (int ii = 1; ii < count; ++ii) 
{ 
    results = results.Intersect(SomeMethod(ii)); 
} 

重寫上述使用的+代替Intersect

int results = SomeMethod(0); 
for (int ii = 1; ii < count; ++ii) 
{ 
    results = results + SomeMethod(ii); 
} 

立即變得清楚的是Intersect來代替其他更「普通的」聚合函數可以使用(例如數學運算符)。

+0

讓我做一些測試。它太漂亮了,我甚至都不懂。 .Aggregate((x,y)=> x.Intersect(y))是x .Select的結果,y是聚合的變量? – Paparazzi 2012-07-06 20:25:17

+0

我已經使用了[源類型也是累加器類型]的重載(http://msdn.microsoft.com/zh-cn/library/dd383981)。鑑於'Select'返回的是自己枚舉的「rows」,累加器將是一個枚舉。如果你認爲像'Sum'(或'+')這樣的'Intersect',這會更有意義。 – user7116 2012-07-06 20:52:01

+0

謝謝,還沒有完全理解它。但是我進行了足夠多的測試,我對這個答案感到滿意。我仍然沒有得到x,y。一個支持並行的枚舉如何轉化爲兩個變量? – Paparazzi 2012-07-06 23:30:33