2011-04-16 191 views
25

我有一個for循環,其中迭代i的計算不依賴於先前迭代中完成的計算。並行化for循環

我想並行化for循環(我的代碼是在java中),以便可以在多個處理器上同時運行多個迭代的計算。我應該爲每次迭代計算創建一個線程,即要創建的線程數等於迭代次數(for循環中的迭代次數很多)?這個怎麼做?

回答

10

您不應該手動執行線程處理。相反:

  • 創建reasonably-sized thread pool executor service(如果你的計算做任何IO,你有內核使用盡可能多線程)。
  • 運行一個循環,將每個單獨的計算提交給執行程序服務並保留生成的Future對象。請注意,如果每個計算僅包含少量工作,則會產生大量開銷,甚至可能比單線程程序慢。在這種情況下,按照mdma的建議提交做計算包的作業。
  • 運行第二個循環,收集來自所有Future S中的結果(它會隱等到所有的計算已完成)
  • 關閉執行服務
+0

第一循環甚至可以到的invokeAll'單個調用()'來代替。 – 2011-04-16 11:45:49

+0

@Péter:在大多數情況下,您必須運行一個循環才能生成所有Callable的代碼,不妨在當時提交它們。 – 2011-04-16 11:48:39

+0

是真實的,除非有人想把處理任務的準備工作分開。 – 2011-04-16 12:50:17

2

不,你不應該創建一個線程每次迭代。線程的最佳數量與可用處理器的數量有關 - 線程太多,並且浪費了太多時間上下文切換而無法提高性能。

如果您不完全依賴Java,則可能需要嘗試一種並行的高性能C系統,如OpenMPI。 OpenMPI適用於這類問題。

+0

你對線程的看法:處理器只有在操作是CPU而不是IO綁定時纔是真的。例如,如果您要從API中抓取小型JSON文檔,請求的延遲時間可能會比處理數據所需的時間長。更多的線程會幫助,而不是傷害。 – 2016-03-22 16:17:40

0

不要自己創建線程。我建議您使用fork/join框架(jsr166y)並創建迭代給定範圍項目的任務。它將爲您處理線程管理,使用硬件支持的線程數量。

任務粒度是這裏的主要問題。如果每次迭代的計算量相對較低(比如少於100次操作),那麼將每次迭代作爲一個單獨的任務執行將會引入大量的任務調度開銷。最好讓每個任務都接受一個要計算的參數列表,並將結果作爲列表返回。通過這種方式,您可以讓每項任務計算1,10或數千個元素,從而將任務劃分爲合理的級別,以保持工作可用,並減少任務管理開銷。

jsr166z中還有一個ParallelArray類,它允許對數組進行重複計算。如果你正在計算的值是原始類型,這可能對你有用。

45

下面是一個小例子,您可能會發現對並行化入門很有幫助。它假定:

  1. 您創建一個Input對象,其中包含您的計算的每個迭代的輸入。
  2. 您創建一個包含計算每次迭代輸入的輸出的Output對象。
  3. 您希望傳入輸入列表並一次返回所有輸出列表。
  4. 您的輸入是一個合理的工作要做,所以開銷不是太高。

如果您的計算非常簡單,那麼您可能需要考慮分批處理它們。你可以通過在每個輸入中輸入100來做到這一點。它使用與系統中的處理器一樣多的線程。如果你正在處理純粹的CPU密集型任務,那麼這可能是你想要的數字。你會想去更高,如果他們正在阻塞等待別的東西(磁盤,網絡,數據庫等)

public List<Output> processInputs(List<Input> inputs) 
     throws InterruptedException, ExecutionException { 

    int threads = Runtime.getRuntime().availableProcessors(); 
    ExecutorService service = Executors.newFixedThreadPool(threads); 

    List<Future<Output>> futures = new ArrayList<Future<Output>>(); 
    for (final Input input : inputs) { 
     Callable<Output> callable = new Callable<Output>() { 
      public Output call() throws Exception { 
       Output output = new Output(); 
       // process your input here and compute the output 
       return output; 
      } 
     }; 
     futures.add(service.submit(callable)); 
    } 

    service.shutdown(); 

    List<Output> outputs = new ArrayList<Output>(); 
    for (Future<Output> future : futures) { 
     outputs.add(future.get()); 
    } 
    return outputs; 
} 
+0

這很好。這與fork和join有什麼不同。請不要介意,如果我錯了,我只是一個新手用戶 – CTsiddharth 2012-03-15 11:21:55

+0

+1不錯的一塊,thx – Jakob 2013-07-05 08:39:35

+0

好的答案@ WhiteFang34 – 2013-09-18 23:14:48