1

假設我有以下的代碼,我wan't我的PC的多個CPU內核上擴展工作負載優化:多線程使用線程池一個for循環中完成的工作

double[] largeArray = getMyLargeArray(); 
double result = 0; 
for (double d : largeArray) 
    result += d; 
System.out.println(result); 

在這個例子中,我可以將在for循環中完成的工作分配給多個線程,並在繼續打印result之前驗證線程是否全部終止。因此,我想出了一些看起來像這樣:

final double[] largeArray = getMyLargeArray(); 
int nThreads = 5; 
final double[] intermediateResults = new double[nThreads]; 

Thread[] threads = new Thread[nThreads]; 
final int nItemsPerThread = largeArray.length/nThreads; 
for (int t = 0; t<nThreads; t++) { 
    final int t2 = t; 
    threads[t] = new Thread(){ 
     @Override public void run() { 
      for (int d = t2*nItemsPerThread; d<(t2+1)*nItemsPerThread; d++) 
       intermediateResults[t2] += largeArray[d]; 
     } 
    }; 
} 
for (Thread t : threads) 
    t.start(); 
for (Thread t : threads) 
    try { 
     t.join(); 
    } catch (InterruptedException e) { } 
double result = 0; 
for (double d : intermediateResults) 
    result += d; 
System.out.println(result); 

假設largeArray的長度是nThreads整除。此解決方案正常工作。

但是,我遇到了上面的for循環線程在我的程序中出現了很多問題,由於線程的創建和垃圾回收導致了很多開銷。因此,我正在通過使用ThreadPoolExecutor來修改我的代碼。給出中間結果的線程將在下一次執行中重新使用(在本例中爲求和)。

因爲我將中間結果存儲在一個事先必須知道的大小數組中,所以我想使用一個固定大小的線程池。 但是,我有麻煩,讓一個線程知道它應該在數組的哪個地方存儲它的結果。 我應該定義我自己的ThreadFactory嗎?

或者我更好的使用由方法Executors.newSingleThreadExecutor(ThreadFactory myNumberedThreadFactory)創建的ExecutorService的數組?

請注意,在我的實際程序中,很難用其他類型的東西代替double[] intermediateResults。我更喜歡一種僅限於創建正確類型的線程池的解決方案。

+1

如何使用ForkJoin框架?該框架允許您將大型計算工作分解爲「固定大小」片段,並將結果批量片段分配到線程池中。 Linky:http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html – user268396

+0

Java 8將於下週發佈,您的使用案例與全新'Streams' API完美搭配。 –

+0

順便說一句,讓我向你保證,「垃圾收集的線程」不是你的問題。 –

回答

1

我遇到了麻煩,但是讓thread知道array應該存儲它的結果。我應該定義自己的ThreadFactory嗎?

不需要那個。執行程序使用的接口(RunnableCallable)由線程運行,您可以將任何參數傳遞給要傳遞的實現(例如,數組,開始索引和結束索引)。

A ThreadPoolExecutor確實是一個很好的解決方案。如果您的runnable支持結果,請查看FutureTask

+0

這個直截了當的答案實際上是正確的。我在想'Runnable'應該知道它在哪個線程中,哪個線程又知道它應該存儲在數組中的哪個位置。這樣我可以說線程1存儲在數組1中。實際上,這種想法很奇怪,因爲這些線程是並行的,沒有直觀的方式讓線程1正確標記爲'1'。我可以提供5個(或者一般來說:我擁有的核心數量)'Runnable's,每個存儲在陣列中的一個獨特位置。 –

+0

你是否同意使用'Executors.newFixedThreadPool(Runtime.getRuntime()。availableProcessors())'是一件好事? –

+1

這是一個很好的默認,如果你的任務是CPU綁定,是的 – fge

0

你最好創建一個「工作者」線程,它可以從隊列中獲取有關工作的信息。您的過程將創建一個最初爲空的WorkQueue,然後創建並啓動工作線程。每個工作線程將從隊列中提取工作,完成工作,並將工作放在一個「已完成」隊列中,供主人接收和處理。

+0

您描述的內容聽起來像是一個SingleThreadExecutor。 (請參閱我的文章的倒數第二段) –

1

ExecutorService爲您提供API通過Future接口獲取來自線程池的結果:

Future<Double> futureResult = executorService.submit(new Callable<Double>() { 
    Double call() { 
     double totalForChunk = 0.0; 
     // do calculation here 
     return totalForChunk; 
    } 
}); 

現在,所有你需要做的是提交任務(Callable實例),並等待結果可用:

List<Future<Double>> results = new ArrayList<Double>(); 
for (int i = 0; i < nChunks; i++) { 
    results.add(executorService.submit(callableTask)); 
} 

或者更簡單:

List<Future<Double>> results = executorService.invokeAll(callableTaskList); 

其餘的是容易的,迭代results並收集總:

double total = 0.0; 
for (Future<Double> result : results) { 
    total += result.get(); // this will block until your task is completed by executor service 
} 

有了這樣說,你不在乎你有多少線程在執行服務。您只需提交任務並在可用時收集結果。

+0

這實際上是該示例的一個很好的解決方案,但由於我(可能不清楚)的解釋,我無法將結果類型更改爲列表。在我的實際程序中,這個問題更爲重要:線程將結果存儲在雙倍的大型數據結構中,而不是一個雙重結果。 ExecutorService返回所有這些數據結構並將它們放在正確的位置是非常不方便的。 –

+0

您可以將目標數據結構始終傳遞給可調用實例,並將結果直接放在那裏。不需要從工作中返回任何東西。但是您仍然可以使用Feature.get()在作業完成時同步。 – hoaz