9

我有一個應用程序,其中有1個大文件的1000個小部分。以並行方式執行N個線程

我一次最多需要上傳16個零件。

我使用.Net的線程並行庫。

我使用的Parallel.For在多個部分來劃分和分配的1種方法,其應該爲每個一部分來執行,並設置DegreeOfParallelism〜16

我需要與由不同的部分產生的校驗和值來執行1種方法上傳,所以我必須設置某些機制,我必須等待所有部件上傳說1000完成。 在TPL庫我面臨1個問題,它是隨機執行從1000的16個線程中的任何一個。

我想要一些機制,我可以運行最初的16個線程,如果第一或第二或任何16線程完成任務接下來的第17部分應該開始。

我該如何做到這一點?

enter image description here

+4

我喜歡的圖像 – Abdullah

+0

如果@ USR的回答是不行的,看看[我的回答這裏](http://stackoverflow.com/a/15056827/106159)可能適用。否則,如果你可以使用TPL的DataflowBlock類,可能會更好(我想你不能,因爲你指定C#4) –

+0

@Abdullah它只是缺少一些手繪紅圈 – TheLethalCoder

回答

2

下面是這樣做的手動方法。

您需要一個隊列。隊列是未完成任務的序列。您必須將其出列並放入工作任務列表中。當任務完成後,將其從工作任務列表中刪除,並從隊列中取出另一個任務。主線程控制這個過程。這裏是如何做到這一點的示例。

對於我使用的整數列表,但它應該適用於其他類型,因爲它使用泛型。

private static void Main() 
{ 
    Random r = new Random(); 
    var items = Enumerable.Range(0, 100).Select(x => r.Next(100, 200)).ToList(); 

    ParallelQueue(items, DoWork); 
} 

private static void ParallelQueue<T>(List<T> items, Action<T> action) 
{ 
    Queue pending = new Queue(items); 
    List<Task> working = new List<Task>(); 

    while (pending.Count + working.Count != 0) 
    { 
     if (pending.Count != 0 && working.Count < 16) // Maximum tasks 
     { 
      var item = pending.Dequeue(); // get item from queue 
      working.Add(Task.Run(() => action((T)item))); // run task 
     } 
     else 
     { 
      Task.WaitAny(working.ToArray()); 
      working.RemoveAll(x => x.IsCompleted); // remove finished tasks 
     } 
    } 
} 

private static void DoWork(int i) // do your work here. 
{ 
    // this is just an example 
    Task.Delay(i).Wait(); 
    Console.WriteLine(i); 
} 

如果您遇到如何爲自己實施DoWork的問題,請讓我知道。因爲如果你改變方法簽名,你可能需要做一些改變。

更新

您還可以使用異步做到這一點等待,而不會阻塞主線程。

private static void Main() 
{ 
    Random r = new Random(); 
    var items = Enumerable.Range(0, 100).Select(x => r.Next(100, 200)).ToList(); 

    Task t = ParallelQueue(items, DoWork); 

    // able to do other things. 

    t.Wait(); 
} 

private static async Task ParallelQueue<T>(List<T> items, Func<T, Task> func) 
{ 
    Queue pending = new Queue(items); 
    List<Task> working = new List<Task>(); 

    while (pending.Count + working.Count != 0) 
    { 
     if (working.Count < 16 && pending.Count != 0) 
     { 
      var item = pending.Dequeue(); 
      working.Add(Task.Run(async() => await func((T)item))); 
     } 
     else 
     { 
      await Task.WhenAny(working); 
      working.RemoveAll(x => x.IsCompleted); 
     } 
    } 
} 

private static async Task DoWork(int i) 
{ 
    await Task.Delay(i); 
} 
+0

爲什麼你使用'新的任務'而不是'Task.Run'?在服務器代碼中用「Wait」阻塞通常也是一個壞主意。 – avo

+0

我剛剛給出了方式。 'Task.Delay.Wait'只是一個例子。你可以在DoWork裏面做任何你想做的事情。我感謝你幫助了我。我修復了代碼。現在更好(?)....即時通訊沒有多線程經驗。即時通訊新編程(只有1年),所以忍受着我;)@avo –

+0

@avo看看更新。好嗎? –

4
var workitems = ... /*e.g. Enumerable.Range(0, 1000000)*/; 
SingleItemPartitioner.Create(workitems) 
.AsParallel() 
.AsOrdered() 
.WithDegreeOfParallelism(16) 
.WithMergeOptions(ParallelMergeOptions.NotBuffered) 
.ForAll(i => { Thread.Slee(1000); Console.WriteLine(i); }); 

這應該是你所需要的。我忘了這些方法是如何命名的......查看文檔。

通過在睡眠1秒後打印到控制檯(此示例代碼所做的)來測試此操作。

+0

我的回答被接受了。但我認爲它不應該。我認爲OP有這裏描述的http://stackoverflow.com/questions/33869830/ordered-parallel-is-not-working-as-expected-convert-list-into-ienumerable/33869969#33869969 .'workitems。選擇(x => x)'來修復這個項目列表的並行。你的方法將把列表分成塊,而這不是OP想要的。因此將列表更改爲可枚舉將解決他的問題。 –

+0

@ M.kazemAkhgary這實際上是一個很好的觀點,我忘記了分區。這實際上是一個糟糕的TPL默認值,另一個。 – usr

5

一個可能的候選人可能是TPL Dataflow。這是一個演示,它接收整數流並將它們輸出到控制檯。您可以設置要在平行紡MaxDegreeOfParallelism取其多線程:

void Main() 
{ 
    var actionBlock = new ActionBlock<int>(
      i => Console.WriteLine(i), 
      new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 16}); 

    foreach (var i in Enumerable.Range(0, 200)) 
    { 
     actionBlock.Post(i); 
    } 
} 

,如果你想有多個生產者/消費者這也可以很好地擴展。

1

另一種選擇是使用BlockingCollection<T>爲您的文件閱讀器線程和你的16個上傳線程之間的隊列。每個上傳者線程都會循環使用阻塞集合,直到完成爲止。

而且,如果要限制隊列中的內存消耗,可以設置阻塞集合的上限,以便在緩衝區達到容量時文件讀取器線程將暫停。這在您可能需要限制每個用戶/ API調用使用的內存的服務器環境中特別有用。

// Create a buffer of 4 chunks between the file reader and the senders 
BlockingCollection<Chunk> queue = new BlockingCollection<Chunk>(4); 

// Create a cancellation token source so you can stop this gracefully 
CancellationTokenSource cts = ... 

文件讀取器線程

... 
queue.Add(chunk, cts.Token); 
... 
queue.CompleteAdding(); 

發送線程

for(int i = 0; i < 16; i++) 
{ 
    Task.Run(() => { 
     foreach (var chunk in queue.GetConsumingEnumerable(cts.Token)) 
     { 
      .. do the upload 
     } 
    }); 
}