2012-07-20 22 views
3

我有一塊程序處理大量文件,其中對於每個文件需要做兩件事:首先,讀取和處理一些文件,然後得到的MyFileData得到存儲。第一部分可以並行化,第二部分不可以。優化許多文件的並行處理

順序做一切是很慢的,因爲CPU必須等待磁盤,然後它了一下,然後發出另一個請求,並再次等待......

我做了以下

class MyCallable implements Callable<MyFileData> { 
    MyCallable(File file) { 
     this.file = file; 
    } 
    public MyFileData call() { 
     return someSlowOperation(file); 
    } 
    private final File file; 
} 

for (File f : files) futures.add(executorService.submit(new MyCallable(f))); 
for (Future<MyFileData> f : futures) sequentialOperation(f.get()); 

它有很大幫助。不過,我希望能夠提升兩件事情:

  • sequentialOperation被固定的順序,而不是處理任何結果,請首先執行。我該如何改變它?

  • 有成千上萬的文件需要處理,並且啓動數千個磁盤請求可能會導致磁盤被毀壞。通過使用Executors.newFixedThreadPool(10)我限制了這個數字,但是我正在尋找更好的東西。理想情況下,它應該是自我調整,以便它在不同的計算機上工作最佳(例如,當可用RAID和/或NCQ等時,發出更多請求)。我不認爲它可以基於找出硬件配置,但測量處理速度和基於它的優化應該不知何故是可能的。任何想法?

+1

誰從未做過一個快速出樣,我相信「CHII」有答案是:按原樣繼續並行操作,但將這些結果放入磁盤寫入的隊列(對於磁盤IO的串行性質更適合)。 – BonanzaDriver 2012-07-20 14:19:06

回答

6

的sequentialOperation被以固定的順序而不是處理的任何結果是可用的第一執行。我該如何改變它?

這正是CompletionService所做的:它並行處理任務並在完成時返回它們,而不管提交順序如何。

簡體(未測試)例如:

int NUM_THREADS = Runtime.getRuntime().availableProcessors(); 
ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS); 
CompletionService<MyFileData> completionService = new ExecutorCompletionService<MyFileData>(executor); 

for (File f : files) futures.add(completionService.submit(new MyCallable(f))); 

for(int i = 0; i < futures.size(); i++) { 
    Future<MyFileData> next = completionService.take(); 
    sequentialOperation(next.get()); 
} 

有數以千計的文件進行處理並開始數千個磁盤的請求可能導致磁盤搗毀。通過使用Executors.newFixedThreadPool(10)我限制了這個數字,但是我正在尋找更好的東西。

我不是100%肯定的那一個。我想這取決於你擁有多少個磁盤,但我會認爲磁盤訪問部分不應該被拆分成太多的線程(每個磁盤一個線程可能是明智的):如果多個線程同時訪問一個磁盤,它會花更多的時間尋求比閱讀。

+1

一次詢問兩件事情是一個糟糕的主意。也許我會將磁盤部分移到一個新問題中。 「CompletionService」是更容易的部分的最簡單的解決方案,並立即工作。 – maaartinus 2012-07-20 15:59:18

2

sequentialOperation以固定的順序執行,而不是處理先有的結果。我該如何改變它?

假設:每個someSlowOperation(file);通話是要採取可變的時間量,從而,你想,只要你接受她處理MyFileData,但不能同時爲另一sequentialOperation

您可以通過設置生產者/消費者隊列來實現此目的。

生產者是你在你的例子中執行的callables,添加的位將結果添加到等待處理的工作隊列中。

消費者是sequentialOperation()調用 - 它在自己的線程中運行,並且只有一個。這個線程所做的就是取出隊列的頭部並對其進行處理,直到程序結束。

這樣,您可以最大限度地使用機器上的所有資源。

相關的職位有一些示例代碼:Producer/Consumer threads using a Queue

編輯:我想你可能想,因爲它很不透明給任何人之前

public class Main { 

    private final ExecutorService producerExecutor = Executors.newFixedThreadPool(10); 
    private final ExecutorService consumerExecutor = Executors.newFixedThreadPool(1); 
    private final LinkedBlockingQueue<MyData> queue = new LinkedBlockingQueue();//or some other impl 

    abstract class Producer implements Runnable{ 
     private final File file; 
     Producer(File file) { 
      this.file = file; 
     } 

     public void run() { 
      MyData result = someLongAssOperation(file); 
      queue.offer(result); 
     } 

     public abstract void someLongAssOperation(File file); 
    } 

    abstract class Consumer implements Runnable { 
     public void run() { 
      while (true) { 
       sequentialOperation(queue.take()); 
      } 
     } 

     public abstract void sequentialOperation(MyData data); 
    } 

    private void start() { 
     consumerExecutor.submit(new Consumer(){ 
      //implement sequentialOperation here 
     }); 

     for (File f : files) { 
      producerExecutor.submit(new Producer(file) { 
       //implement the someLongAssOperation() 
      }); 
     } 

    } 

    public static void main(String[] args) { 
     new Main().start();  
    } 

} 
+0

相當複雜,但很好知道。現在我堅持使用另一個答案中的CompletionService,因爲它的工作量少得多。也許我需要稍後運行我自己的隊列,讓我們看看。 – maaartinus 2012-07-20 15:57:05