2012-01-05 89 views
2

我需要用Java編寫一個程序,它將讀取目錄樹中相對大量(〜50,000個)文件,處理數據並將處理後的數據輸出到單獨的(平坦的)目錄。並行讀取和寫入多個文件

目前,我有這樣的事情:

private void crawlDirectoyAndProcessFiles(File directory) { 
    for (File file : directory.listFiles()) { 
    if (file.isDirectory()) { 
     crawlDirectoyAndProcessFiles(file); 
    } else { 
     Data d = readFile(file); 
     ProcessedData p = d.process(); 
     writeFile(p,file.getAbsolutePath(),outputDir); 
    } 
    } 
} 

我只想說,每個這樣的方法被刪除,下調爲便於閱讀,但他們都做工精細。整個過程工作正常,但速度很慢。數據處理通過遠程服務進行,需要5-15秒。乘以5萬...

我從來沒有做過任何事情多線程之前,但我想我可以得到一些相當不錯的速度提高,如果我這樣做。任何人都可以提供一些指導我如何有效地平行這種方法?

+0

文件的大小和處理的密度如何?我問,因爲如果有更多的時間花在從磁盤讀取文件上,那麼你實際上不會在線程中獲得太多的收益。 – SimonC 2012-01-05 05:01:05

+1

對於幾乎可以確定爲磁盤綁定的任務,您不太可能獲得任何加速。除非您試圖並行化位於不同物理驅動器上的目錄... – Mysticial 2012-01-05 05:02:00

+0

您是將輸出轉換爲單個文件還是每個文件的文件? – MahdeTo 2012-01-05 05:13:56

回答

5

我會使用ThreadPoolExecutor來管理線程。你可以做這樣的事情:

private class Processor implements Runnable { 
    private final File file; 

    public Processor(File file) { 
     this.file = file; 
    } 

    @Override 
    public void run() { 
     Data d = readFile(file); 
     ProcessedData p = d.process(); 
     writeFile(p,file.getAbsolutePath(),outputDir); 
    } 
} 

private void crawlDirectoryAndProcessFiles(File directory, Executor executor) { 
    for (File file : directory.listFiles()) { 
     if (file.isDirectory()) { 
      crawlDirectoryAndProcessFiles(file,executor); 
     } else { 
      executor.execute(new Processor(file); 
     } 
    } 
} 

你會使用獲得執行人:

ExecutorService executor = Executors.newFixedThreadPool(poolSize); 

其中poolSize是你想要去的線程在一次的最大數量。 (在這裏有一個合理的數字非常重要; 50,000個線程並不是一個好主意,合理的數字可能是8)。請注意,在排隊所有文件後,主線程可以等待,直到調用完成executor.awaitTermination

+0

在最後考慮一個「連接」,以確保所有處理都完成,以便與原始方法的行爲保持一致 – 2012-01-06 10:11:04

+1

@AdrianShum - 好點;我建議現在使用'ExecutorService.awaitTermination()' – 2012-01-06 18:20:51

+1

這個例子最好用fork連接池運行,這個池可以和'new ForkJoinPool(numprocs)'一起使用並等待終止。密集流程實際上對這些池的效果最好,而像Fibonacci序列這樣的小流程對單線程或線程執行程序來說可能是最好的(對於正確管理的自定義代碼更好)。 – 2013-12-17 16:02:41

1

最簡單的(也許是最合理的)方法之一是有一個線程池(看看相應的執行程序)。主線程負責在目錄中進行爬網。遇到文件時,創建一個「作業」(Runnable/Callable)並讓Executor處理作業。

(這應該是足以讓你開始,我不想讓太多的具體代碼怎麼把它不應該是你很難搞清楚,一旦您已經閱讀執行人,贖回等部分)

5

假設您有一個硬盤(即只允許單個同時讀取操作的東西,而不是SSD或RAID陣列,網絡文件系統等),那麼您只需要一個線程執行IO(從/寫入磁盤)。此外,您只需要儘可能多的線程執行CPU內核操作,否則時間將會在上下文切換中浪費。

鑑於上述限制,下面的代碼應該適合您。單線程執行程序確保一次只能執行一個Runnable。固定線程池確保不超過NUM_CPUSRunnable s在任何時間都在執行。

這件事不能做的一件事是提供處理完成時間的反饋。

private final static int NUM_CPUS = 4; 

private final Executor _fileReaderWriter = Executors.newSingleThreadExecutor(); 
private final Executor _fileProcessor = Executors.newFixedThreadPool(NUM_CPUS); 

private final class Data {} 
private final class ProcessedData {} 

private final class FileReader implements Runnable 
{ 
    private final File _file; 
    FileReader(final File file) { _file = file; } 
    @Override public void run() 
    { 
    final Data data = readFile(_file); 
    _fileProcessor.execute(new FileProcessor(_file, data)); 
    } 

    private Data readFile(File file) { /* ... */ return null; }  
} 

private final class FileProcessor implements Runnable 
{ 
    private final File _file; 
    private final Data _data; 
    FileProcessor(final File file, final Data data) { _file = file; _data = data; } 
    @Override public void run() 
    { 
    final ProcessedData processedData = processData(_data); 
    _fileReaderWriter.execute(new FileWriter(_file, processedData)); 
    } 

    private ProcessedData processData(final Data data) { /* ... */ return null; } 
} 

private final class FileWriter implements Runnable 
{ 
    private final File _file; 
    private final ProcessedData _data; 
    FileWriter(final File file, final ProcessedData data) { _file = file; _data = data; } 
    @Override public void run() 
    { 
    writeFile(_file, _data); 
    } 

    private Data writeFile(final File file, final ProcessedData data) { /* ... */ return null; } 
} 

public void process(final File file) 
{ 
    if (file.isDirectory()) 
    { 
    for (final File subFile : file.listFiles()) 
     process(subFile); 
    } 
    else 
    { 
    _fileReaderWriter.execute(new FileReader(file)); 
    } 
} 
+0

當你調用'_fileReaderWriter.execute(new FileWriter(_file,processedData));',這是一個異步調用? – 2014-01-29 17:14:38

+0

是的,它會在'_fileReaderWriter'隊列的一個線程上執行一個新任務。 – SimonC 2014-02-03 13:07:28