我正在尋找一種方法來執行java中的批處理任務。這個想法是基於線程池的ExecutorService
,這將允許我從main
線程的不同線程中傳播一組Callable
。這個類應該提供一個waitForCompletion方法,它將使main
線程進入休眠狀態,直到執行完所有任務。然後應喚醒線程,它將執行一些操作並重新提交一組任務。如何實現ExecutorService以執行批處理任務
此過程將重複多次,所以我想使用ExecutorService.shutdown
,因爲這需要創建ExecutorService
的多個實例。
目前我已經在使用AtomicInteger
以下方式來實現它,和Lock
/Condition
:
public class BatchThreadPoolExecutor extends ThreadPoolExecutor {
private final AtomicInteger mActiveCount;
private final Lock mLock;
private final Condition mCondition;
public <C extends Callable<V>, V> Map<C, Future<V>> submitBatch(Collection<C> batch){
...
for(C task : batch){
submit(task);
mActiveCount.incrementAndGet();
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
mLock.lock();
if (mActiveCount.decrementAndGet() == 0) {
mCondition.signalAll();
}
mLock.unlock();
}
public void awaitBatchCompletion() throws InterruptedException {
...
// Lock and wait until there is no active task
mLock.lock();
while (mActiveCount.get() > 0) {
try {
mCondition.await();
} catch (InterruptedException e) {
mLock.unlock();
throw e;
}
}
mLock.unlock();
}
}
請不,我不一定會從一批提交所有的任務一次完成,因此CountDownLatch
做似乎不是一種選擇。
這是一個有效的方法嗎?有沒有更高效/優雅的方式來實現呢?
感謝
你能解釋一下爲什麼默認的執行程序不能處理你的用例嗎?爲什麼你需要擴展'ThreadPoolExecutor'? – Gray 2012-04-24 12:49:53
那麼API不指定一個方法來等待所有提交的任務完成,除非先調用'shutdown'。 在我的情況下,我不想關閉執行程序,因爲我幾乎會立即需要執行程序,並且這會導致無用的線程創建。它回答你的問題嗎? – 2012-04-24 13:00:11
看到這個問題:http://stackoverflow.com/questions/3269445/executorservice-how-to-wait-for-all-tasks-to-finish/3269888 – andersoj 2012-04-24 13:35:49