2014-01-17 58 views
2

我使用Parallel.ForEach工作一堆項目。問題是,我想根據打開的工作人員(插槽)的數量來確定哪些項目能夠工作的優先順序。例如。如果我在8個並行的東西上工作,並在任務1-4之間打開一個插槽,我希望將簡單的工作分配給這些插槽。插槽的下半部分將得到努力工作。通過這種方式,我不會把所有8個插槽都捆綁在一起進行艱苦/長時間運行的工作,易/快速的項目將首先運行。我實現了這個如下:使Parallel.ForEach等待工作,直到一個插槽打開

守則

const int workers = 8; 
List<Thing> thingsToDo = ...; //Get the things that need to be done. 
Thing[] currentlyWorkingThings = new Thing[workers]; //One slot for each worker. 

void Run() { 
    Parallel.ForEach(PrioritizeThings(thingsToDo), o => { 
     int index = 0; 

     //"PrioritizeTasks" added this thing to the list of currentlyWorkingThings. 
     //Find my position in this list. 
     lock (currentlyWorkingThings) 
      index = currentlyWorkingThings.IndexOf(o); 

     //Do work on this thing... 

     //Then remove it from the list of currently working things, thereby 
     // opening a new slot when this worker returns/finishes. 
     lock (currentlyWorkingThings) 
      currentlyWorkingThings[index] = null; 
    }); 
} 

IEnumerable<Thing> PrioritizeThings(List<Thing> thingsToDo) { 
    int slots = workers; 
    int halfSlots = (int)Math.Ceiling(slots/2f); 

    //Sort thingsToDo by their difficulty, easiest first. 

    //Loop until we've worked every Thing. 
    while (thingsToDo.Count > 0) { 
     int slotToFill = ...; //Find the first open slot. 
     Thing nextThing = null; 

     lock (currentlyWorkingThings) { 
      //If the slot is in the "top half", get the next easy thing - otherwise 
      // get the next hard thing. 
      if (slotToFill < halfSlots) 
       nextThing = thingsToDo.First(); 
      else 
       nextThing = thingsToDo.Last(); 

      //Add the nextThing to the list of currentlyWorkingThings and remove it from 
      // the list of thingsToDo. 
      currentlyWorkingThings[slotToFill] = nextThing; 
      thingsToDo.Remove(nextThing); 
     } 

     //Return the nextThing to work. 
     yield return nextThing; 
    } 
} 

的問題

所以我看到這裏的問題是,Parallel請求,接下來的事情去努力從PrioritizeThings開始插槽(在現有物品完成之前)。我認爲Parallel正在展望未來,並提前準備好工作。我希望它不要這樣做,並且只在完成完成後才填寫工人/插槽。我想要解決這個問題的唯一方法是在PrioritizeThings中添加一個睡眠/等待循環,在它看到一個合法的開放時隙之前,它不會返回一個工作。但我不喜歡這樣,我希望有一些方法可以使Parallel等待更長時間才能開始工作。有什麼建議麼?

+0

你可以保留8(加1待定)的東西正在努力。如果你的複雜任務少於4個,那麼給它一個複雜的任務,否則給它一個簡單的任務。你會有一個不平衡的時間最長,直到最簡單的任務完成。 – Jesse

+0

@Jesse我認爲這是我必須要做的。只給「並行」的8個項目開始工作,然後把它放在一邊看,直到所有項目已經在8個項目塊(或無論那裏有多少工人)中工作。不知道這是你的建議,但這是我在閱讀你的評論時想到的。 –

+0

(大聲思考)我不能做我剛纔建議的事情,因爲直到當前所有項目的8個完成後,新項目纔會有效。衛生署! @Jesse - 如果你想澄清你的意思,我會很感激。 –

回答

3

有一種方法可以支持您正在描述的情況。

當您創建ForEach時,您需要傳遞ParallelOptions以及非標準TaskScheduler。難的是創造一個TaskSchedueler做優先系統對你來說,幸運的是微軟發佈的包含一個這樣的調度例子一包名爲「ParallelExtensionsExtras」以其調度QueuedTaskScheduler

private static void Main(string[] args) 
{ 
    int totalMaxConcurrancy = Environment.ProcessorCount; 
    int highPriorityMaxConcurrancy = totalMaxConcurrancy/2; 

    if (highPriorityMaxConcurrancy == 0) 
     highPriorityMaxConcurrancy = 1; 

    QueuedTaskScheduler qts = new QueuedTaskScheduler(TaskScheduler.Default, totalMaxConcurrancy); 
    var highPriortiyScheduler = qts.ActivateNewQueue(0); 
    var lowPriorityScheduler = qts.ActivateNewQueue(1); 

    BlockingCollection<Foo> highPriorityWork = new BlockingCollection<Foo>(); 
    BlockingCollection<Foo> lowPriorityWork = new BlockingCollection<Foo>(); 

    List<Task> processors = new List<Task>(2); 

    processors.Add(Task.Factory.StartNew(() => 
    { 
     Parallel.ForEach(highPriorityWork.GetConsumingPartitioner(), //.GetConsumingPartitioner() is also from ParallelExtensionExtras, it gives better performance than .GetConsumingEnumerable() with Parallel.ForEeach(
         new ParallelOptions() { TaskScheduler = highPriortiyScheduler, MaxDegreeOfParallelism = highPriorityMaxConcurrancy }, 
         ProcessWork); 
    }, TaskCreationOptions.LongRunning)); 

    processors.Add(Task.Factory.StartNew(() => 
    { 
     Parallel.ForEach(lowPriorityWork.GetConsumingPartitioner(), 
         new ParallelOptions() { TaskScheduler = lowPriorityScheduler}, 
         ProcessWork); 
    }, TaskCreationOptions.LongRunning)); 


    //Add some work to do here to the highPriorityWork or lowPriorityWork collections 


    //Lets the blocking collections know we are no-longer going to be adding new items so it will break out of the `ForEach` once it has finished the pending work. 
    highPriorityWork.CompleteAdding(); 
    lowPriorityWork.CompleteAdding(); 

    //Waits for the two collections to compleatly empty before continueing 
    Task.WaitAll(processors.ToArray()); 
} 

private static void ProcessWork(Foo work) 
{ 
    //... 
} 

即使你有Parallel.ForEach兩個實例運行它們兩者的組合總數不會超過您通過MaxConcurrency傳入QueuedTaskScheduler構造函數中的值,並且如果兩項工作都有工作要做,則優先清空highPriorityWork集合(最多可達到所有可用插槽的1/2,以便您不會窒息低優先級隊列,您可以輕鬆地將其調整爲更高或更低的比例取決於你的性能需求)。

如果你不希望高優先級總是贏,而你更希望有一個在兩個列表之間交替的「循環」風格的計劃程序(所以你不希望快速項目總是贏,但只是讓他們隨着緩慢的項目洗牌),你可以設置相同的優先級到兩個或更多的隊列(或只是使用RoundRobinTaskSchedulerQueue這是做同樣的事情)

+0

有趣。如果highPriorityScheduler中的所有事情都完成了,那麼lowPriorityScheduler會以低優先級填充那些額外的插槽嗎?例如。我不想讓簡單的東西完成,然後4個插槽閒置,而硬件隊列很長。 –

+0

是的,它會的。我可以推薦的一件事是,如果你沒有一個高優先級項目的連續流,你永遠不會處理低優先級項目,那麼將高優先級'ParallelOptions'改爲'{TaskScheduler = highPriortiyScheduler,MaxDegreeOfParallelism = 4}'。通過將該約束添加到foreach中,高優先級的foreach將一次處理0-4個項目,而低優先級將一次處理0-8個項目,但如果工作進行時會放棄插入高優先級項目的插槽那個隊列。 –

+0

我已經更新了代碼示例以顯示我剛剛討論的內容。一個缺點是,如果硬件完成簡單的東西,最多隻能使用4個內核進行處理。如果你不擔心被困住的困難工作,你可以刪除約束條件,或者將其更改爲更加極端的比例,例如'totalMaxConcurrancy-1' –

相關問題