2011-09-26 117 views
3

我調用一個worker方法調用數據庫,然後迭代並返回並行處理的值。爲了防止它錘擊數據庫,我有一個Thread.Sleep在那裏暫停執行到數據庫。但是,這似乎阻止了Parallel.ForEach中仍在發生的執行。達到此目的的最佳方法是防止阻塞?Thread.Sleep阻止任務的並行執行

private void ProcessWorkItems() 
{ 
    _cancellation = new CancellationTokenSource(); 
    _cancellation.Token.Register(() => WorkItemRepository.ResetAbandonedWorkItems()); 

    Task.Factory.StartNew(() => 
     Parallel.ForEach(GetWorkItems().AsParallel().WithDegreeOfParallelism(10), workItem => 
     { 
      var x = ItemFactory(workItem); 
      x.doWork(); 
     }), _cancellation.Token); 
} 

private IEnumerable<IAnalysisServiceWorkItem> GetWorkItems() 
{ 
    while (!_cancellation.IsCancellationRequested) 
    { 
     var workItems = WorkItemRepository.GetItemList(); //database call 

     workItems.ForEach(item => 
     { 
      item.QueueWorkItem(WorkItemRepository); 
     }); 

     foreach (var item in workItems) 
     { 
      yield return item; 
     } 

     if (workItems.Count == 0) 
     { 
      Thread.Sleep(30000); //sleep this thread for 30 seconds if no work items. 
     } 
    } 

    yield break; 
} 

編輯: 我改變了它包括的答案,它仍然沒有工作,我很期待。我將.AsParallel()。WithDegreeOfParallelism(10)添加到GetWorkItems()調用中。即使基線正在睡覺,我認爲並行應該繼續執行,我的期望是否不正確?

例如: 我有15個項目,它迭代並抓取10個項目並啓動它們。當每一個完成時,它會從GetWorkItems中請求另一個,直到它嘗試請求第16個項目。此時它應該停止嘗試抓取更多物品,但應該繼續處理物品11-15,直到這些物品完成爲止。這是平行應該如何工作?因爲它目前沒有這樣做。它目前正在做的是當它完成時6,它鎖定後續的10個仍在Parallel.ForEach中運行。

+3

'Thread.Sleep'幾乎從來都不是一個好的解決方案。你能解釋一下你想要完成的事嗎? 'WorkItemRepository.GetItemList'做什麼? –

+2

真正的程序不睡覺()。你正在浪費一個線程並令Task調度器感到沮喪。 –

+0

@Jim Mischel:我很確定'WorkItemRepository.GetItemList'類似於'SELECT * FROM workqueue WHERE status ='unprocessed'',並將它們編譯爲可排隊的工作項類型thingy。 –

回答

7

我建議你創建一個工作項目的BlockingCollection(隊列),並且每30秒調用一次數據庫來填充它。喜歡的東西:

BlockingCollection<WorkItem> WorkItems = new BlockingCollection<WorkItem>(); 

而且在初始化:

System.Threading.Timer WorkItemTimer = new Timer((s) => 
    { 
     var items = WorkItemRepository.GetItemList(); //database call 
     foreach (var item in items) 
     { 
      WorkItems.Add(item); 
     } 
    }, null, 30000, 30000); 

這將查詢每30秒的項目數據庫。

對於安排要處理的工作項目,您有許多不同的解決方案。最近你有什麼會是這樣:

WorkItem item; 

while (WorkItems.TryTake(out item, Timeout.Infinite, _cancellation)) 
{ 
    Task.Factory.StartNew((s) => 
     { 
      var myItem = (WorkItem)s; 
      // process here 
     }, item); 
} 

消除在任何線程阻塞,並讓TPL決定如何最好地分配並行任務。

編輯:

其實更接近你所擁有的是:

foreach (var item in WorkItems.GetConsumingEnumerable(_cancellation)) 
{ 
    // start task to process item 
} 

您可能能夠使用:

Parallel.Foreach(WorkItems.GetConsumingEnumerable(_cancellation).AsParallel ... 

我不知道這是否會工作或多好。也許值得嘗試一下 。 。 。編輯

一般而言

到底有什麼我建議是,你把這個作爲一個生產者/消費者應用程序,與製片人是定期查詢數據庫的新項目的線程。我的示例每N(本例中爲30)秒查詢數據庫一次,如果平均每30秒就可以清空一次您的工作隊列,這將很有效。從項目發佈到數據庫的時間開始,平均延遲時間不會超過一分鐘,直到獲得結果爲止。

您可以減少輪詢頻率(從而減少延遲),但這會導致更多的數據庫流量。

你也可以更喜歡它。例如,如果您在30秒後輪詢數據庫並獲得大量項目,那麼很可能您會很快得到,並且您需要在15秒(或更少)內再次輪詢。相反,如果您在30秒後輪詢數據庫並且什麼也沒有得到,那麼在您再次輪詢之前您可能會等待更長的時間。

您可以使用一次性計時器設置這種自適應輪詢。也就是說,當您創建計時器時,您爲最後一個參數指定-1,這會導致它只觸發一次。你的定時器回調計算出在下一次輪詢之前需要等待多長時間,並呼叫Timer.Change用新值初始化定時器。

+0

我實現了你建議的編輯。但是,計時器似乎不會多次觸發。我將這段代碼從上面添加到我的構造函數和其他一些地方,希望能夠多次觸發它。 'System.Threading.Timer WorkItemTimer' 關於如何讓計時器工作的任何想法?儘管如此,它似乎只是通過'BlockingCollection'工作。感謝那。 –

+0

@ConwayStern:我無法想象爲什麼計時器會失敗多次。在類作用域聲明'WorkItemTimer',然後在構造函數中初始化它。如果你在構造函數中聲明它,也許它正在收集垃圾。 –

+0

這幫助我給了我更多關於搜索內容的想法。以此作爲基本示例[Consumer/Producer using BlockingCollection](http://social.msdn.microsoft.com/Forums/en-AU/parallelextensions/thread/2764e94e-b284-451c-88d8-7e7228c2af71)結束。謝謝。 –

2

您可以使用.WithDegreeOfParallelism()擴展方法強制PLinq同時運行這些任務。在呼叫阻塞或I/O密集部分中有一個很好的例子C# Threading Handbook

+0

這將如何幫助這裏?它在線程的nr上設置了最大值,OP似乎需要最小值。 –

+1

@亨克Holterman,它設置處理器的最大數量,而不是線程。這將是雙核心機器上的兩個。由於在Thread.Sleep調用中不需要CPU時間,因此Microsoft建議'在查詢執行大量非計算限制的工作(如文件I/O)的情況下,指定一個度的並行度大於機器上的內核數量。' – scottm

0

你想用睡覺來完成什麼?從我所知道的情況來看,你正試圖避免數據庫調用的衝擊。我不知道有一個更好的方法可以做到這一點,但理想情況下,您的GetItemList調用會阻止,直到數據可用於處理。

0

你可能會陷入分區人的犯規。

因爲你正在傳遞一個IEnumerable,所以Parallel.ForEach將使用一個塊分區程序,它可以嘗試從一個塊中的枚舉中一次獲取幾個元素。但是你的IEnumerable.MoveNext可以睡覺,這會讓事情變得很糟糕。

您可以編寫自己的分區程序,一次返回一個元素,但無論如何,我認爲像Jim Mischel的建議這樣的生產者/消費者方法會更好。