2012-07-24 79 views
3

我有一個應用程序可以處理來自輸入目錄的多個文件中存儲的數據,然後根據該數據生成一些輸出。多線程文件處理和報告

到目前爲止,應用程序的工作原理在連續的基礎上,也就是說,它會啓動一個「經理」線程

  • 讀取輸入目錄的內容爲File[]陣列
  • 流程序列中的每個文件,結果存儲
  • 終止時,所有文件都處理

我想這個轉換爲多線程應用程序,在其中的「人時效器」線程

  • 讀取輸入目錄的內容到一個File[]陣列
  • 推出了一些‘處理器’線程,每個線程的處理的單個文件中,存儲結果,並返回該文件的摘要報告以「經理」線程時所有文件都被處理

的「處理器」的線程數是最多等於文件的數量,因爲它們會通過ThreadPoolExecutor回收

  • 終止。

    任何避免使用join()wait()/notify()的解決方案都是可取的。基於上述情況

    1. 什麼是那些具有「處理器」主題報告給「經理」線程的最佳方式?基於CallableFuture的實現是否有意義?
    2. 「經理」線程如何知道所有「處理器」線程何時完成,即何時處理完所有文件?
    3. 如果處理器線程需要「太長時間」(即,儘管經過了預先配置的時間量,它還沒有返回結果),是否有一種「計時」處理器線程並終止它的方法?

    任何指針或(僞)源代碼的例子將不勝感激。

  • +1

    我不是線程專家,但這裏是我的快速拍攝給你一個方向: 1.您可以使用等待/通知機制來控制報告行爲。 2.主管理器線程可以在讀取文件時處理數據。它是否需要等到所有文件都處理完畢? 3.有TimerTask,可以在你的情況下得心應手。 – Sid 2012-07-24 21:45:20

    +0

    感謝您的快速回答。管理器線程不需要等待所有處理器線程完成,但它確實需要知道它們何時完成。我寧願不使用wait()/ notify(),因此引用了Callable接口。無論如何,請將您的評論置於答案中,以便我可以投票。 :-) – PNS 2012-07-24 21:52:08

    +1

    您是否需要同步訪問公共資源?就像一個文件或緩存你收集你的結果? – Mithon 2012-07-24 21:57:33

    回答

    2

    你絕對可以做到這一點,而無需使用join()wait()/notify()自己。

    你應該看看java.util.concurrent.ExecutorCompletionService開始。

    我看到它,你應該寫以下類方式:

    • FileSummary - 該策略將文件轉換成FileSummary - 保存處理單個文件
    • FileProcessor implements Callable<FileSummary>的結果簡單的值對象導致
    • File Manager - 創建FileProcessor情況下,他們提交給工作隊列,然後彙總結果的高級別經理。然後

    的文件管理器看起來是這樣的:

    class FileManager { 
        private CompletionService<FileSummary> cs; // Initialize this in constructor 
    
        public FinalResult processDir(File dir) { 
         int fileCount = 0; 
         for(File f : dir.listFiles()) { 
         cs.submit(new FileProcessor(f)); 
         fileCount++; 
         } 
    
         for(int i = 0; i < fileCount; i++) { 
         FileSummary summary = cs.take().get(); 
         // aggregate summary into final result; 
         } 
        } 
    

    如果你想實現你可以使用poll()方法上CompletionService而不是take()超時。

    +0

    這非常接近我在使用Callable/Future對象時的想法。 ExecutorCompletionService是一個很棒的建議! poll()的替代方法可能是使用帶有超時的get()。請編輯get()調用,它應該是cs.take()。get(),而不是cs.take.get()。謝謝! :-) – PNS 2012-07-24 23:34:04

    +0

    @PNS其實我不確定在get()上使用超時會起作用,因爲take()會阻塞,直到有可用結果,即get()永遠不會阻塞。 – 2012-07-25 03:23:10

    +0

    如果使用ExecutorCompletionService,則更正,但如果不是,則可以使用超時。 – PNS 2012-07-25 07:30:18

    1

    wait()/notify()是非常低級的基元,你想避免它們是對的。

    最簡單的解決方案是使用線程安全隊列(或堆棧等 - 在這種情況下並不重要)。在啓動工作線程之前,主線程可以將所有File添加到線程安全隊列/堆棧。然後啓動工作線程,並讓它們全部拉動並處理它們,直到沒有剩下的線程爲止。

    工作線程可以將結果添加到另一個線程安全隊列/堆棧,主線程可以從中獲取結果。主線程知道有多少個File,因此當它檢索到相同數量的結果時,它會知道該作業已完成。

    就像java.util.concurrent.BlockingQueue會工作,並且java.util.concurrent還有其他線程安全的集合,這也可以。

    您還詢問了終止工作線程耗時過長的問題。我會直接告訴你:如果你可以使工作線程上運行的代碼足夠強大,以至於你可以安全地離開這個功能,那麼你會使事情變得更簡單。

    如果您確實需要此功能,最簡單,最可靠的解決方案是讓每個線程的「終止」標誌,使工人的任務代碼檢查該標誌頻繁出境,如果它被設置。爲工作人員制定一個自定義班級,併爲此包含一個volatile boolean字段。還包括一個setter方法(因爲volatile,它不需要是​​)。

    如果工人發現它的「終止」標誌設置,它可以推動其File對象回工作隊列/堆棧上,以便另一個線程可以處理它。當然,如果有一些問題意味着File不能被成功處理,這將導致無限循環。

    最好是使職工的代碼非常簡單和強大,所以你不必擔心它「沒有終止」。

    1
    1. 無需他們回報。只需要計算剩餘工作的數量並在線程完成時減少線程數量。

    2. 當計數到達的剩餘要做的工作爲零,所有的「處理器」線程完成。

    3. 當然,只是代碼添加到線程。當它開始工作時,檢查時間並計算停止時間。定期(例如,當你從文件中讀取更多信息時),檢查它是否超過停止時間,如果是,停止。