2012-07-24 92 views
0

我正在寫一個針對SQL Server的輪詢系統。當檢索任務時,使用Task.Factory.StartNew()將它們卸載到新線程。整個事情是在一個無限循環內,我想只允許N個併發任務。跟蹤併發任務

while(true) 
{ 
    // create connection 

    while(_concurrentTasks < _allowedTasks) 
    { 
     // get job info 

     if(!reader.HasRows) break; // no work to do. 

     while(reader.Read()) 
     { 
      // fill JobInfo entity from reader 

      _taskCollection.Add(Task.Factory.StartNew(() => ProcessJob(jobInfo))); 

      _concurrentTasks++; 
     } 
    } 

    // could break for 1 of 2 reasons, threshold met or nothing to do 
    if((_concurrentTasks < _allowedTasks) 
    { 
     // sleep for a predetermined period of time 
     Thread.Sleep(60000); 
    } 
    else 
    { 
     // wait for one thread to complete and then get more jobs 
     Task.WaitAny(_taskCollection.ToArray); 
    } 
} 

我不確定在這種情況下最好使用哪個集合或如何清理已完成的任務。

如果我把清理代碼放在任務本身中,我可以使用List<Task>以及Task.CurrentId屬性來標識集合中的項目,但我不相信我可以在集合中處理集合中的Task對象。任務本身;還要注意該集合必須是線程安全的。

如果我在任務外(在主線程中)放置某種清理代碼,我不需要線程安全集合,但我不知道主線程中的任務何時完成。

那麼,我應該使用哪個集合來維護併發任務的列表/數組,因此我可以使用WaitAny()以及如何在完成時清理列表?

+0

你爲什麼想將其限制在N個併發任務?您是否因一次運行太多而遇到性能影響?通常,TPL擅長運行「正確」號碼。 – 2012-07-24 21:05:07

+0

@ChrisShain - 我將會有幾個這樣的進程同時運行。如果我不加以限制,一個進程可以佔用所有的工作,並且它們只會堆積在ThreadPool隊列中。 – 2012-07-24 21:09:52

+0

您可以查看其他人制定的調度程序,特別是LimitedConcurrencyLevelTask​​Scheduler.cs。 http://www.drdobbs.com/parallel/specialized-task-schedulers-in-net-4-par/228800428 – KeesDijk 2012-07-24 22:13:56

回答

0

我已經決定在必要時通過它使用數組和循環:

Task[] tasks = new Task[_allowedTasks]; 

while(true) 
{ 
    // create connection 

    while(_concurrentTasks < _allowedTasks) 
    { 
     // get job info 

     if(!reader.HasRows) break; // no work to do. 

     while(reader.Read()) 
     { 

      for (int i = 0; i < tasks.Length; i++) 
      { 
       if (tasks[i] == null || tasks[i].IsCompleted) 
       { 
        // Dispose of Task at position i 

        nextSlot = i; 
        break; 
       } 
      } 

      // fill JobInfo entity from reader 

      tasks[nextSlot] = Task.Factory.StartNew(() => ProcessJob(jobInfo)); 

      _concurrentTasks++; 
     } 
    } 

    // could break for 1 of 2 reasons, threshold met or nothing to do 
    if((_concurrentTasks < _allowedTasks) 
    { 
     // sleep for a predetermined period of time 
     Thread.Sleep(60000); 
    } 
    else 
    { 
     // wait for one thread to complete and then get more jobs 
     Task.WaitAny(_taskCollection.ToArray);  
    } 
}