2014-01-19 89 views
0

這裏是什麼,我想做一個簡要的產生多個線程,我在那裏每天的基礎上動態生成是採取從單個集合的輸入,並把結果在單個集合

  1. 數量的文本文件的方案。 0 至8每天。每個文件的大小可以從小到大。取決於 當天的數據。
  2. 需要對它們執行一些檢查(業務檢查)。

我打算在最短的時間內完成任務,因此試圖編寫一個並行執行程序來執行這些文件的檢查。

我的想法是

  1. 存儲n文件的併發收集(的ConcurrentLinkedQueue)
  2. 刪除一個文件,生成一個線程,運行該文件的所有檢查
  3. 自1個文件沒有關係到另一個我想能夠處理多個文件
  4. 將結果存儲在另一個併發集合(ConcurrentLinkedQueue ...它被轉換爲不同的html pdf報告)
  5. 注意:線程數可以不同於文件數量(我想線程數可配置,它不是文件數=線程數的情況)

我的理解是這樣,我應該能夠完成每天至少檢查一次。

我有我的代碼如下圖所示,什麼讓我困惑「如何存儲在每個線程完成後,單個集合中的所有線程的結果」,我的直覺是我正在做一些有趣的事情(不正確)本人存放結果的方式。

二疑問句要檢查是否有人forsees下面

三疑問句這似乎是一個常見的情況(對我)的指針在代碼段的任何其他問題,以設計模式代碼片段解決這個

注意:我使用JDK 6

public class CheckExecutor { 
    // to store all results of all threads here , then this will be converted to html/pdf files 
    static ConcurrentLinkedQueue<Result> fileWiseResult = new ConcurrentLinkedQueue<Result>(); 

    public static void main(String[] args) { 
     int numberOfThreads=n; // need keep it configurable 
     Collection<ABCCheck> checksToExecute // will populate from business logic , ABCCheck is interface , has a method check() , there are different implementations 

     ConcurrentLinkedQueue<File> fileQueue = new ConcurrentLinkedQueue<File>(); // list of files for 1 day , may vary from 0 to 8 
     int maxNumOfFiles = fileQueue.size(); 

     ThreadGroup tg = new ThreadGroup ("Group"); 
     // If more number of threads than files (rare , can be considered corener case) 
     if (maxNumOfFiles < numberOfThreads) numberOfThreads=maxNumOfFiles; 
     // loop and start number of threads 
     for(int var=0;var<numberOfThreads;var++) 
     { 
      File currentFile = fileQueue.remove(); 
      // execute all checks on 1 file using checksToExecute 
      ExecuteAllChecks checksToRun = new ExecuteAllChecks(); // business logic to populate checks 
      checksToRun.setchecksToExecute(checksToExecute); 
      checksToRun.setcheckResult(fileWiseResult); // when each check finishes want to store result here 
      new Thread (tg , checksToRun , "Threads for "+currentFile.getName()).start(); 
     } 

     // To complete the tasak ... asap ... want to start a new thread as soon as any of current thread ends (diff files diff sizes) 
     while(!fileQueue.isEmpty()) { 
      try { 
       Thread.sleep(10000); // Not sure If this will cause main thread to sleep (i think it will pause current thread) i want to pause main thread 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
      // check processing of how many files completed 
      if((tg.activeCount()<numberOfThreads) && (fileQueue.size()>0)) { 
       int numOfThreadsToStart = numberOfThreads - tg.activeCount(); 
       for(int var1=0;var1<numOfThreadsToStart;var1++) { 
        File currentFile = fileQueue.remove(); 
        ExecuteAllchecks checksToRun = new ExecuteAllchecks(); 
        checksToRun.setchecksToExecute(checksToExecute); 
        checksToRun.setcheckResult(fileWiseResult); // when each check finishes want to store result here 
        new Thread (tg , checksToRun , "Threads for "+currentFile.getName()).start(); 
       } 
      } 
     } 
    } 
} 

class ExecuteAllchecks implements Runnable { 

    private Collection<ABCCheck> checksToExecute; 
    private ConcurrentLinkedQueue<Result> checkResult; // not sure if its correct , i want to store result off all threads here 

    public ConcurrentLinkedQueue<Result> getcheckResult() { 
     return checkResult; 
    } 

    // plan to instantiate the result collection globally and store result here 
    public void setcheckResult(ConcurrentLinkedQueue<Result> checkResult) { 
     this.checkResult = checkResult; 
    } 

    public Collection<ABCCheck> getchecksToExecute() { 
     return checksToExecute; 
    } 

    public void setchecksToExecute(Collection<ABCCheck> checksToExecute) { 
     this.checksToExecute = checksToExecute; 
    } 



    @Override 
    public void run() { 
     Result currentFileResult = new Result(); 
     // TODO Auto-generated method stub 
     System.out.println("Execute All checks for 1 file"); 
     // each check runs and calls setters on currentFileResult 
     checkResult.add(currentFileResult); 
    } 

} 
+4

哇。你真的應該看看'ExecutorService'類。我認爲從提交的'Callable'類返回結果的服務會更好。 http://docs.oracle.com/javase/tutorial/essential/concurrency/exinter.html – Gray

+2

我也不知道你是否需要一個'ExecutorCompletionService'或不。 http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.html – Gray

+1

我用於類似的任務不同的更簡單的方法:主線程啓動跑步者並傳遞他們與文件隊列,然後加入他們要等到他們退出。每個跑步者從隊列輪詢一個文件,處理它,輪詢另一個進程......直到隊列爲空。當隊列爲空時,跑步者完成run()方法,以便完成,當所有跑步者完成時,主線程繼續。 – JosefN

回答

2

實際implem entation非常受計算本身的性質影響,但有些一般的方法可能是:

private final ExecutorService executor = Executors.newCachedThreadPool(); 
private final int taskCount = ...; 
private void process() { 
    Collection< Callable<Result> > tasks = new ArrayList<>(taskCount); 
    for(int i = 0; i < taskCount; i++) { 
     tasks.add(new Callable<Result>() { 

     @Override 
     public Result call() throws Exception { 
      // TODO implement your logic and return result 
      ... 
      return result; 
     } 

     }); 
    } 
    List< Future<Result> > futures = executor.invokeAll(tasks); 
    List<Result> results = new ArrayList<>(taskCount); 
    for(Future<Result> future : futures) { 
     results.add(future.get()); 
    } 
} 

我也建議使用上future.get()調用明智的超時,以執行線程不會卡住。

不過,我也我就不會建議使用緩存的線程池在生產,因爲這池不斷增加,只要當前池沒有足夠的能力爲所有的任務,而是使用類似Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())

我你實際任務可能會分裂成幾個小的,後來加入考慮檢查如何可以有效地使用ForkJoin framework

+2

好的答案,但我可能會在某處發佈結果,而不是等待所有結果完成。因此,如果文件1非常大,我已經得到了文件2的結果。 – Ishtar

+0

是的,我想過解鎖解決方案,唯一我能想到的將使用共享資源,但我仍然需要獲得所有結果(當然,如果沒有對獲得的結果進行額外的處理),您將需要等待與此解決方案相同的時間。這是與阻止方法,你會阻止最長的一個,但如果其他結果準備就緒,未來將立即返回。 – aljipa