2012-06-07 140 views
25

我的問題:如何在ThreadPoolExecutor上執行一堆線程對象並等待它們全部完成後才能繼續?如何等待ThreadPoolExecutor完成

我是ThreadPoolExecutor的新手。所以這段代碼是一個測試它是如何工作的。現在我甚至沒有填寫對象BlockingQueue,因爲我不知道如何在不調用​​與另一個RunnableObject的情況下啓動隊列。無論如何,現在我只需撥打awaitTermination(),但我認爲我仍然錯過了一些東西。任何提示都會很棒!謝謝。

public void testThreadPoolExecutor() throws InterruptedException { 
    int limit = 20; 
    BlockingQueue q = new ArrayBlockingQueue(limit); 
    ThreadPoolExecutor ex = new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q); 
    for (int i = 0; i < limit; i++) { 
    ex.execute(new RunnableObject(i + 1)); 
    } 
    ex.awaitTermination(2, TimeUnit.SECONDS); 
    System.out.println("finished"); 
} 

的RunnableObject類:

package playground; 

public class RunnableObject implements Runnable { 

    private final int id; 

    public RunnableObject(int id) { 
    this.id = id; 
    } 

    @Override 
    public void run() { 
    System.out.println("ID: " + id + " started"); 
    try { 
     Thread.sleep(2354); 
    } catch (InterruptedException ignore) { 
    } 
    System.out.println("ID: " + id + " ended"); 
    } 
} 
+1

,我不認爲這是一個好主意,回答你的問題,你張貼和建議後,立即您自己的答案不太合適。充其量,這保證和更新你的原始問題,解釋爲什麼這個答案是不夠的。 – Kiril

+1

@Link,你是對的。我會解決它。 – kentcdodds

+0

我剛剛編輯了我的答案,以顯示在關閉執行程序之前等待所有作業完成的一種方式。 – Gray

回答

43

你應該awaitTermination

ExecutorService threads; 
// ... 
// Tell threads to finish off. 
threads.shutdown(); 
// Wait for everything to finish. 
while (!threads.awaitTermination(10, TimeUnit.SECONDS)) { 
    log.info("Awaiting completion of threads."); 
} 
+0

完全做到了。我沒有注意到'awaitTermination'返回任何東西。沒有其他人提到我需要通過循環來阻止。謝謝! – kentcdodds

+9

爲什麼在一個循環?爲什麼不增加等待的秒數?通常我會執行一個'awaitTermination(Long.MAX_VALUE,TimeUnit.SECONDS)'或其他。 – Gray

+1

@Gray - 我最初只是在我的一段代碼中調用過它,或者有時候出現了一些非常奇怪的事情,或者有時候一切都被鎖定了。最終我追蹤到這一點,所以現在我循環並記錄一個「等待」消息,以便我知道發生了一些奇怪的事情。 – OldCurmudgeon

4

您的問題似乎是,你是不是您所提交的所有作業,以您的池後,調用shutdown。如果沒有shutdown(),您的awaitTermination將始終返回false。

ThreadPoolExecutor ex = 
    new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q); 
for (int i = 0; i < limit; i++) { 
    ex.execute(new RunnableObject(i + 1)); 
} 
// you are missing this line!! 
ex.shutdown(); 
ex.awaitTermination(2, TimeUnit.SECONDS); 

你也可以做類似以下內容等待您的所有作業完成:

List<Future<Object>> futures = new ArrayList<Future<Object>>(); 
for (int i = 0; i < limit; i++) { 
    futures.add(ex.submit(new RunnableObject(i + 1), (Object)null)); 
} 
for (Future<Object> future : futures) { 
    // this joins with the submitted job 
    future.get(); 
} 
... 
// still need to shutdown at the end 
ex.shutdown(); 

而且,因爲你在睡覺的2354毫秒,但只有等待所有的終止2SECONDS,awaitTermination的作業將始終返回false

最後,這聽起來像你正在擔心創建一個新的ThreadPoolExecutor,而你卻想重用第一個。不要。與您編寫的用於檢測作業是否完成的任何代碼相比,GC開銷將極小。


從的javadoc引用,ThreadPoolExecutor.shutdown()

發起在以前已提交任務的執行一個有序的關閉,但沒有新的任務將被接受。如果已關閉,調用沒有其他影響。

ThreadPoolExecutor.awaitTermination(...)方法,它正在等待執行的狀態去TERMINATED。但是,如果調用shutdown(),首先狀態必須轉到SHUTDOWN,如果調用shutdownNow(),則狀態必須轉到STOP

3

這與執行者本身沒有任何關係。只需使用界面的java.util.concurrent.ExecutorService.invokeAll(Collection<? extends Callable<T>>)即可。它會阻止,直到所有的Callable完成。

執行者意味着長壽;超出一組任務的整個生命週期。 shutdown適用於應用程序完成並清理完畢。

+0

你能詳細解釋一下這個答案嗎?我很喜歡這個答案,但我很難實現它。謝謝 – kentcdodds

-1

試試這個,

ThreadPoolExecutor ex = 
    new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q); 
for (int i = 0; i < limit; i++) { 
    ex.execute(new RunnableObject(i + 1)); 
} 

行添加

ex.shutdown(); 
ex.awaitTermination(timeout, unit) 
+0

你應該檢查從等待終止返回的布爾值並繼續嘗試,直到你成爲true。 –

1

另一種方法循環是使用CompletionService,如果你有非常有用嘗試任何任務結果:

//run 3 task at time 
final int numParallelThreads = 3; 

//I used newFixedThreadPool for convenience but if you need you can use ThreadPoolExecutor 
ExecutorService executor = Executors.newFixedThreadPool(numParallelThreads); 
CompletionService<String> completionService = new ExecutorCompletionService<String>(executor); 

int numTaskToStart = 15; 

for(int i=0; i<numTaskToStart ; i++){ 
    //task class that implements Callable<String> (or something you need) 
    MyTask mt = new MyTask(); 

    completionService.submit(mt); 
} 

executor.shutdown(); //it cannot be queued more task 

try { 
    for (int t = 0; t < numTaskToStart ; t++) { 
     Future<String> f = completionService.take(); 
     String result = f.get(); 
     // ... something to do ... 
    } 
} catch (InterruptedException e) { 
    //termination of all started tasks (it returns all not started tasks in queue) 
    executor.shutdownNow(); 
} catch (ExecutionException e) { 
    // ... something to catch ... 
} 
2

這裏有公認的答案,處理重變體如果/當InterruptedException異常被拋出:

executor.shutdown(); 

boolean isWait = true; 

while (isWait) 
{ 
    try 
    {    
     isWait = !executor.awaitTermination(10, TimeUnit.SECONDS); 
     if (isWait) 
     { 
      log.info("Awaiting completion of bulk callback threads."); 
     } 
    } catch (InterruptedException e) { 
     log.debug("Interruped while awaiting completion of callback threads - trying again..."); 
    } 
}