1

我想用ExecutorService及其函數invokeAll用Java編寫程序。我的問題是:invokeAll函數是否同時解決任務?我的意思是,如果我有兩個處理器,那麼同時會有兩名工人?因爲我不能讓它正確地縮放。這需要在同一時間完成的問題,如果我給newFixedThreadPool(2)或1Java ExecutorService - 縮放

List<Future<PartialSolution>> list = new ArrayList<Future<PartialSolution>>(); 
Collection<Callable<PartialSolution>> tasks = new ArrayList<Callable<PartialSolution>>(); 
for(PartialSolution ps : wp) 
{ 
    tasks.add(new Map(ps, keyWords)); 
} 
list = executor.invokeAll(tasks); 

Map是實現Callablewp是部分解決方案的一個載體,保存在不同時代的一些信息類的類。

它爲什麼不縮放?可能是什麼問題呢?

這是PartialSolution的代碼:

import java.util.HashMap; 
import java.util.Vector; 

public class PartialSolution 
{ 
    public String fileName;//the name of a file 
    public int b, e;//the index of begin and end of the fragment from the file 
    public String info;//the fragment 
    public HashMap<String, Word> hm;//here i retain the informations 
    public HashMap<String, Vector<Word>> hmt;//this i use for the final reduce 

    public PartialSolution(String name, int b, int e, String i, boolean ok) 
    { 
     this.fileName = name; 
     this.b = b; 
     this.e = e; 
     this.info = i; 
     hm = new HashMap<String, Word>(); 
     if(ok == true) 
     { 
      hmt = new HashMap<String, Vector<Word>>(); 
     } 
     else 
     { 
      hmt = null; 
     }  
    } 
} 

的這對地圖的代碼:

public class Map implements Callable<PartialSolution> 
{ 
    private PartialSolution ps; 
    private Vector<String> keyWords; 

    public Map(PartialSolution p, Vector<String> kw) 
    { 
     this.ps = p; 
     this.keyWords = kw; 
    } 

    @Override 
    public PartialSolution call() throws Exception 
    { 
     String[] st = this.ps.info.split("\\n"); 
     for(int j = 0 ; j < st.length ; j++) 
     { 
      for(int i = 0 ; i < keyWords.size() ; i++) 
      { 
       if(keyWords.elementAt(i).charAt(0) != '\'') 
       { 
        int k = 0; 
        int index = 0; 
        int count = 0; 

        while((index = st[j].indexOf(keyWords.elementAt(i), k)) != -1) 
        { 
         k = index + keyWords.elementAt(i).length(); 
         count++; 
        } 
        if(count != 0) 
        { 
         Word wr = this.ps.hm.get(keyWords.elementAt(i)); 
         if(wr != null) 
         { 
          Word nw = new Word(ps.fileName); 
          nw.nrap = wr.nrap + count; 
          nw.lines = wr.lines; 
          int grep = count; 
          while(grep > 0) 
          { 
           nw.lines.addElement(ps.b + j); 
           grep--; 
          } 
          this.ps.hm.put(keyWords.elementAt(i), nw); 
         } 
         else 
         { 
          Word nw = new Word(ps.fileName); 
          nw.nrap = count; 
          int grep = count; 
          while(grep > 0) 
          { 
           nw.lines.addElement(ps.b + j); 
           grep--; 
          } 
          this.ps.hm.put(keyWords.elementAt(i), nw); 
         } 
        } 
       } 
       else 
       { 
        String regex = keyWords.elementAt(i).substring(1, keyWords.elementAt(i).length() - 1); 
        StringBuffer sb = new StringBuffer(regex); 
        regex = sb.toString(); 
        Pattern pt = Pattern.compile(regex); 
        Matcher m = pt.matcher(st[j]); 
        int count = 0; 
        while(m.find()) 
        { 
         count++; 
        } 
        if(count != 0) 
        { 
         Word wr = this.ps.hm.get(keyWords.elementAt(i)); 
         if(wr != null) 
         { 
          Word nw = new Word(this.ps.fileName); 
          nw.nrap = wr.nrap + count; 
          nw.lines = wr.lines; 
          int grep = count; 
          while(grep > 0) 
          { 
           nw.lines.addElement(ps.b + j); 
           grep--; 
          } 
          this.ps.hm.put(keyWords.elementAt(i), nw); 
         } 
         else 
         { 
          Word nw = new Word(this.ps.fileName); 
          nw.nrap = count; 
          int grep = count; 
          while(grep > 0) 
          { 
           nw.lines.addElement(ps.b + j); 
           grep--; 
          } 
          this.ps.hm.put(keyWords.elementAt(i), nw); 
         } 
        } 
       } 
      } 
     } 
     this.ps.info = null; 
     return this.ps; 
    } 
} 

所以在地圖我從該片段的每一行,並搜索每個表達式數的外觀,我也保存線的數量。在我處理完所有片段後,在同一個PartialSolution中,我將這些信息保存在散列圖中並返回新的PartialSolution。在下一步中,我將PartialSolutions與相同的fileName結合起來,並將它們引入Callable類Reduce中,它與map相同,不同之處在於它可以進行其他操作,但也返回PartialSolution。

這是跑地圖的任務代碼:

List<Future<PartialSolution>> list = new ArrayList<Future<PartialSolution>>(); 
Collection<Callable<PartialSolution>> tasks = new ArrayList<Callable<PartialSolution>>(); 
for(PartialSolution ps : wp) 
{ 
    tasks.add(new Map(ps, keyWords)); 
}  
list = executor.invokeAll(tasks); 

在任務我創建類型地圖的任務,並在列表中我得到他們。我不知道如何讀取JVM線程轉儲。我希望能夠提供給我的信息。如果有幫助,我在NetBeans 7.0.1中工作。

謝謝 亞歷

+0

多少任務,你呢?他們做什麼?有很多I/O? – Thilo

+0

我的任務是那些可調用的類,它們使用PartialSolution,它有一些文本並計算一個單詞出現文本和行的次數。 PartialSolution實際上是文本的一部分,我希望爲每個部分獲取這些信息,然後將它們與另一個名爲Reduce的Callable類聯合起來。我想同時處理這些零件。取決於我擁有的處理器數量。 I/O將在最後,當我將所有任務和10個部分聯合起來說,並將有一個關於該文件的所有信息。 Google使用的是MapReduce。 –

+0

我想知道的是,如果方法invokeAll,如果我創建了10個線程的ExcutorService,將同時解決10個任務或一次解決一個任務?在Map中我有一個構造函數,並實現了函數call(),該函數返回另一個PartialSolution,但是這次是正確的信息。還有一個問題,如果我說list.get(i).get()這將返回PartialSolution解決之後賴特? 我真的不明白爲什麼不改善時間,如果我使用2線程而不是1.爲什麼它不縮放賴特? –

回答

2

我想知道的是如果方法的invokeAll,如果我有10個線程創建的ExcutorService,將解決10個任務在同一時間或將解決一次一個?

如果您向10個線程的ExecutorService提交10個任務,它將同時運行它們。他們是否可以完全平行並獨立於彼此取決於他們在做什麼。但他們每個人都有自己的線索。

另一個問題,如果我說list.get(i).get()這將返回PartialSolution解決後?

是的,它會阻塞直到計算完成(如果還沒有完成)並返回結果。

我真不明白的時候爲什麼不提高,如果我用2個線程,而不是1

我們需要看到更多的代碼。他們是否在某些共享數據上同步?這些任務需要多長時間?如果他們很短,你可能不會注意到任何差異。如果它們花費更長時間,請查看JVM線程轉儲以驗證它們全部正在運行。

+2

+1。但有一個錯誤:invokeAll返回已完成期貨的清單。換句話說:只有當所有任務完成後纔會返回。 –

0

如果使用兩個線程創建線程池,那麼兩個任務將同時運行。

我發現有兩件事情可能導致兩個線程花費與一個線程相同的時間。

如果只有一個Map任務佔用大部分時間,那麼額外的線程將不會使該任務運行得更快。它不能比最慢的工作完成得更快。

另一種可能性是您的地圖任務經常從共享矢量中讀取。這可能會導致足夠的爭用來取消擁有兩個線程的收益。

你應該在jvisualvm中看看每個線程正在做什麼。

+0

我已經安裝了VisualVM,但我不知道如何使用它,我的意思是我不知道要看,如何讀取數據。請幫助一些。 –

+0

我已經做了這個步驟:Profiler - > CPU - >右鍵單擊然後線程轉儲...但我不明白的事情。 –

+0

@ StanciuAlexandru-Marian我建議使用ThreadFactory命名你的線程有意義的東西。然後找到線程列表中的線程。然後檢查代碼運行時每個線程的狀態如何變化。這會給你一個每個線程正在做多少工作的指示。如果一個線程正在等待,則可以執行線程轉儲以查看它正在等待什麼。 –

0

Java 8在Executors - newWorkStealingPool中引入了更多的API來創建工作搶佔池。您不必創建RecursiveTaskRecursiveAction,但仍可以使用ForkJoinPool

public static ExecutorService newWorkStealingPool() 

創建使用所有可用的處理器作爲其目標並行水平工作竊取線程池。

默認情況下,它將以CPU內核數量作爲並行性參數。如果您有核心CPU,則可以有8個線程處理工作任務隊列。

Work stealing of idle worker threads from busy worker threads improves overall performance. Since task queue is unbounded in nature, this ForkJoinPool is recommended for the tasks executing in short time intervals.

無論ExecutorServiceForkJoinPoolThreadPoolExecutor表現會好,如果你沒有共享數據和共享鎖定(同步)和線程間通信。如果任務隊列中的所有任務都是相互獨立的,則性能會得到提高。

ThreadPoolExecutor構造定製和任務控制工作流程:

ThreadPoolExecutor(int corePoolSize, 
         int maximumPoolSize, 
         long keepAliveTime, 
         TimeUnit unit, 
         BlockingQueue<Runnable> workQueue, 
         ThreadFactory threadFactory, 
         RejectedExecutionHandler handler) 

看一看相關SE的問題:

How to properly use Java Executor?

Java's Fork/Join vs ExecutorService - when to use which?