2012-04-24 74 views
4

我正在尋找一種方法來執行java中的批處理任務。這個想法是基於線程池的ExecutorService,這將允許我從main線程的不同線程中傳播一組Callable。這個類應該提供一個waitForCompletion方法,它將使main線程進入休眠狀態,直到執行完所有任務。然後應喚醒線程,它將執行一些操作並重新提交一組任務。如何實現ExecutorService以執行批處理任務

此過程將重複多次,所以我想使用ExecutorService.shutdown,因爲這需要創建ExecutorService的多個實例。

目前我已經在使用AtomicInteger以下方式來實現它,和Lock/Condition

public class BatchThreadPoolExecutor extends ThreadPoolExecutor { 
    private final AtomicInteger mActiveCount; 
    private final Lock   mLock; 
    private final Condition  mCondition; 

    public <C extends Callable<V>, V> Map<C, Future<V>> submitBatch(Collection<C> batch){ 
    ... 
    for(C task : batch){ 
     submit(task); 
     mActiveCount.incrementAndGet(); 
    } 
    } 

    @Override 
    protected void afterExecute(Runnable r, Throwable t) { 
    super.afterExecute(r, t); 
    mLock.lock(); 
    if (mActiveCount.decrementAndGet() == 0) { 
     mCondition.signalAll(); 
    } 
    mLock.unlock(); 
    } 

    public void awaitBatchCompletion() throws InterruptedException { 
    ... 
    // Lock and wait until there is no active task 
    mLock.lock(); 
    while (mActiveCount.get() > 0) { 
     try { 
     mCondition.await(); 
     } catch (InterruptedException e) { 
     mLock.unlock(); 
     throw e; 
     } 
    } 
    mLock.unlock(); 
    } 
} 

請不,我不一定會從一批提交所有的任務一次完成,因此CountDownLatch做似乎不是一種選擇。

這是一個有效的方法嗎?有沒有更高效/優雅的方式來實現呢?

感謝

+0

你能解釋一下爲什麼默認的執行程序不能處理你的用例嗎?爲什麼你需要擴展'ThreadPoolExecutor'? – Gray 2012-04-24 12:49:53

+0

那麼API不指定一個方法來等待所有提交的任務完成,除非先調用'shutdown'。 在我的情況下,我不想關閉執行程序,因爲我幾乎會立即需要執行程序,並且這會導致無用的線程創建。它回答你的問題嗎? – 2012-04-24 13:00:11

+1

看到這個問題:http://stackoverflow.com/questions/3269445/executorservice-how-to-wait-for-all-tasks-to-finish/3269888 – andersoj 2012-04-24 13:35:49

回答

7

我認爲ExecutorService本身將能夠執行您的要求。

致電invokeAll([...])並重復執行所有任務。如果您可以遍歷所有期貨,所有任務都已完成。

+0

這是我第一次執行,問題在於'main '線程在提交任務時可能會被_interupted_(意味着某些東西會'中斷'循環的執行),因此我不能依賴'invokeAll'。 我可以在外部等待'Future.get',但我認爲在設計方面讓執行者對此負責。我可能錯了,但;) – 2012-04-24 13:08:56

+0

我認爲這個解決方案是最乾淨的。 'main'線程在休眠時可能同樣被打斷 - 你實際上是打算中斷它嗎,還是因爲'InterruptedException'被選中而出現這種情況? – artbristol 2012-04-24 13:30:29

+0

我沒有看到'invokeAll'在內部等待所有任務完成,這可能是最乾淨的解決方案,我將重構與主線程_interruption_相關的代碼(我不是在談論對Thread.interrupt的調用',在創建任務的循環中只是一個有條件的'beak') – 2012-04-24 13:43:46

0

我同意@ckuetbach的默認Java Executors應該爲您提供執行「批處理」作業所需的所有功能。

如果我是你,我只需提交一堆工作,等待他們完成ExecutorService.awaitTermination(),然後啓動一個新的ExecutorService。要做到這一點,以節省「線程創作」是不成熟的優化,除非你每秒做這個100秒的事情。

如果你真的被困在使用相同的ExecutorService爲每個批次,那麼你可以自己分配一個ThreadPoolExecutor,並在一個循環看ThreadPoolExecutor.getActiveCount()。喜歡的東西:

BlockingQueue jobQueue = new LinkedBlockingQueue<Runnable>(); 
ThreadPoolExecutor executor = new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS, 
    0L, TimeUnit.MILLISECONDS, jobQueue); 
// submit your batch of jobs ... 
// need to wait a bit for the jobs to start 
Thread.sleep(100); 
while (executor.getActiveCount() > 0 && jobQueue.size() > 0) { 
    // to slow the spin 
    Thread.sleep(1000); 
} 
// continue on to submit the next batch 
+1

每次創建一個新的執行程序都會導致在30秒內出現大約240個新線程,我對此感覺不好:) 關於'executor.getActiveCount )API說它只有一個近似的計數,Thread.sleep對我來說不是一個好選擇,因爲我想盡可能快:我實現了一個組合優化算法,兩個性能指標是解決方案質量和速度,每個毫秒計數! – 2012-04-24 13:17:40

+0

不要。在30秒內240個新主題是沒有意義的。嘗試創建和銷燬線程的for循環。看看你能在30秒內完成多少次。 – Gray 2012-04-24 13:20:36

+0

考慮到您的速度要求,我會放棄自定義代碼,並且每次只創建一個新的執行程序。你不會後悔的。 – Gray 2012-04-24 13:21:19

3

至於其他的答案指出,似乎沒有爲你的使用情況中的任何一部分需要自定義的ExecutorService。

在我看來,你需要做的就是提交一個批處理,等待它們全部完成,同時忽略主線程中的中斷,然後根據第一批結果提交另一批。我相信這只是一個問題:

ExecutorService service = ...; 

    Collection<Future> futures = new HashSet<Future>(); 
    for (Callable callable : tasks) { 
     Future future = service.submit(callable); 
     futures.add(future); 
    } 

    for(Future future : futures) { 
     try { 
      future.get(); 
     } catch (InterruptedException e) { 
      // Figure out if the interruption means we should stop. 
     } 
    } 

    // Use the results of futures to figure out a new batch of tasks. 
    // Repeat the process with the same ExecutorService. 
+0

我會按照您的建議回退到之前的實施。 @ckuetbach你有這個解決方案的功勞 – 2012-04-24 13:37:48

相關問題