2011-07-20 144 views
12

我有一個ScheduledExecutorService任務計劃在一個小時內執行。我如何獲得未完成任務的清單,以便我可以強制他們立即運行?如何在ExecutorService.shutdown()之後立即運行未完成的任務?

我認爲shutdown()將等待一個小時,看起來好像shutdownNow()返回無法運行()的Runnable列表,因爲Runnable實現檢查Executor狀態,以及何時發現它已關閉Runnable拒絕運行。有關實際實施,請參見ScheduledThreadPoolExecutor.ScheduledFutureTask.run()

任何想法?

回答

3

我已經採取了馬克·彼得斯的答案,實現所有的抽象方法,增加線程安全,並試圖尊重底層的ScheduledThreadPoolExecutor配置只要有可能。

/** 
* Overrides shutdown() to run outstanding tasks immediately. 
* 
* @author Gili Tzabari 
*/ 
public class RunOnShutdownScheduledExecutorService extends AbstractExecutorService 
    implements ScheduledExecutorService 
{ 
    private final ScheduledExecutorService delegate; 
    private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; 
    private final ExecutorService immediateService; 
    private final ConcurrentMap<Future<?>, Callable<?>> tasks = Maps.newConcurrentMap(); 

    /** 
    * Creates a new RunOnShutdownScheduledExecutorService. 
    * 
    * @param delegate the executor to delegate to 
    */ 
    public RunOnShutdownScheduledExecutorService(ScheduledExecutorService delegate) 
    { 
     Preconditions.checkNotNull(delegate, "delegate may not be null"); 

     this.delegate = delegate; 
     if (delegate instanceof ScheduledThreadPoolExecutor) 
     { 
      this.scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor) delegate; 
      this.immediateService = Executors.newFixedThreadPool(scheduledThreadPoolExecutor. 
       getCorePoolSize(), scheduledThreadPoolExecutor.getThreadFactory()); 
     } 
     else 
     { 
      scheduledThreadPoolExecutor = null; 
      this.immediateService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder(). 
       setNameFormat(RunOnShutdownScheduledExecutorService.class.getName() + "-%d").build()); 
     } 
    } 

    @Override 
    public boolean isShutdown() 
    { 
     return delegate.isShutdown(); 
    } 

    @Override 
    public boolean isTerminated() 
    { 
     return delegate.isTerminated(); 
    } 

    @Override 
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException 
    { 
     long before = System.nanoTime(); 
     if (!delegate.awaitTermination(timeout, unit)) 
      return false; 
     long after = System.nanoTime(); 
     long timeLeft = timeout - unit.convert(after - before, TimeUnit.NANOSECONDS); 
     return immediateService.awaitTermination(timeLeft, unit); 
    } 

    @Override 
    public void execute(Runnable command) 
    { 
     delegate.execute(command); 
    } 

    @Override 
    public ScheduledFuture<?> schedule(final Runnable command, long delay, TimeUnit unit) 
    { 
     CleaningRunnable decorated = new CleaningRunnable(command); 
     ScheduledFuture<?> future = delegate.schedule(decorated, delay, unit); 
     decorated.setFuture(future); 
     tasks.put(future, Executors.callable(command)); 
     return new CleaningScheduledFuture<>(future); 
    } 

    @Override 
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) 
    { 
     CallableWithFuture<V> decorated = new CallableWithFuture<>(callable); 
     ScheduledFuture<V> future = delegate.schedule(decorated, delay, unit); 
     decorated.setFuture(future); 
     tasks.put(future, callable); 
     return new CleaningScheduledFuture<>(future); 
    } 

    @Override 
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, 
     TimeUnit unit) 
    { 
     CleaningRunnable decorated = new CleaningRunnable(command); 
     ScheduledFuture<?> future = delegate.scheduleAtFixedRate(decorated, initialDelay, period, unit); 
     decorated.setFuture(future); 
     tasks.put(future, Executors.callable(command)); 
     return new CleaningScheduledFuture<>(future); 
    } 

    @Override 
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, 
     TimeUnit unit) 
    { 
     CleaningRunnable decorated = new CleaningRunnable(command); 
     ScheduledFuture<?> future = 
      delegate.scheduleWithFixedDelay(decorated, initialDelay, delay, unit); 
     decorated.setFuture(future); 
     tasks.put(future, Executors.callable(command)); 
     return new CleaningScheduledFuture<>(future); 
    } 

    @Override 
    public synchronized void shutdown() 
    { 
     if (delegate.isShutdown()) 
      return; 
     if (scheduledThreadPoolExecutor != null) 
     { 
      // WORKAROUND: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=7069418 
      // 
      // Cancel waiting scheduled tasks, otherwise executor won't shut down 
      scheduledThreadPoolExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); 
     } 
     delegate.shutdown(); 
     // Users will not be able to cancel() Futures past this point so we're guaranteed that 
     // "tasks" will not be modified. 

     final List<Callable<?>> outstandingTasks = Lists.newArrayList(); 
     for (Map.Entry<Future<?>, Callable<?>> entry: tasks.entrySet()) 
     { 
      Future<?> future = entry.getKey(); 
      Callable<?> task = entry.getValue(); 

      if (future.isDone() && future.isCancelled()) 
      { 
       // Task called by the underlying executor, not the user. See CleaningScheduledFuture. 
       outstandingTasks.add(task); 
      } 
     } 
     tasks.clear(); 
     if (outstandingTasks.isEmpty()) 
     { 
      immediateService.shutdown(); 
      return; 
     } 

     immediateService.submit(new Callable<Void>() 
     { 
      @Override 
      public Void call() throws Exception 
      { 
       delegate.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 

       // Execute outstanding tasks only after the delegate executor finishes shutting down 
       for (Callable<?> task: outstandingTasks) 
        immediateService.submit(task); 
       immediateService.shutdown(); 
       return null; 
      } 
     }); 
    } 

    @Override 
    public List<Runnable> shutdownNow() 
    { 
     return delegate.shutdownNow(); 
    } 

    /** 
    * A Runnable that removes its future when running. 
    */ 
    private class CleaningRunnable implements Runnable 
    { 
     private final Runnable delegate; 
     private Future<?> future; 

     /** 
     * Creates a new RunnableWithFuture. 
     * 
     * @param delegate the Runnable to delegate to 
     * @throws NullPointerException if delegate is null 
     */ 
     public CleaningRunnable(Runnable delegate) 
     { 
      Preconditions.checkNotNull(delegate, "delegate may not be null"); 

      this.delegate = delegate; 
     } 

     /** 
     * Associates a Future with the runnable. 
     * 
     * @param future a future 
     */ 
     public void setFuture(Future<?> future) 
     { 
      this.future = future; 
     } 

     @Override 
     public void run() 
     { 
      tasks.remove(future); 
      delegate.run(); 
     } 
    } 

    /** 
    * A Callable that removes its future when running. 
    */ 
    private class CallableWithFuture<V> implements Callable<V> 
    { 
     private final Callable<V> delegate; 
     private Future<V> future; 

     /** 
     * Creates a new CallableWithFuture. 
     * 
     * @param delegate the Callable to delegate to 
     * @throws NullPointerException if delegate is null 
     */ 
     public CallableWithFuture(Callable<V> delegate) 
     { 
      Preconditions.checkNotNull(delegate, "delegate may not be null"); 

      this.delegate = delegate; 
     } 

     /** 
     * Associates a Future with the runnable. 
     * 
     * @param future a future 
     */ 
     public void setFuture(Future<V> future) 
     { 
      this.future = future; 
     } 

     @Override 
     public V call() throws Exception 
     { 
      tasks.remove(future); 
      return delegate.call(); 
     } 
    } 

    /** 
    * A ScheduledFuture that removes its future when canceling. 
    * 
    * This allows us to differentiate between tasks canceled by the user and the underlying 
    * executor. Tasks canceled by the user are removed from "tasks". 
    * 
    * @param <V> The result type returned by this Future 
    */ 
    private class CleaningScheduledFuture<V> implements ScheduledFuture<V> 
    { 
     private final ScheduledFuture<V> delegate; 

     /** 
     * Creates a new MyScheduledFuture. 
     * 
     * @param delegate the future to delegate to 
     * @throws NullPointerException if delegate is null 
     */ 
     public CleaningScheduledFuture(ScheduledFuture<V> delegate) 
     { 
      Preconditions.checkNotNull(delegate, "delegate may not be null"); 

      this.delegate = delegate; 
     } 

     @Override 
     public long getDelay(TimeUnit unit) 
     { 
      return delegate.getDelay(unit); 
     } 

     @Override 
     public int compareTo(Delayed o) 
     { 
      return delegate.compareTo(o); 
     } 

     @Override 
     public boolean cancel(boolean mayInterruptIfRunning) 
     { 
      boolean result = delegate.cancel(mayInterruptIfRunning); 

      if (result) 
      { 
       // Tasks canceled by users are removed from "tasks" 
       tasks.remove(delegate); 
      } 
      return result; 
     } 

     @Override 
     public boolean isCancelled() 
     { 
      return delegate.isCancelled(); 
     } 

     @Override 
     public boolean isDone() 
     { 
      return delegate.isDone(); 
     } 

     @Override 
     public V get() throws InterruptedException, ExecutionException 
     { 
      return delegate.get(); 
     } 

     @Override 
     public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, 
      TimeoutException 
     { 
      return delegate.get(timeout, unit); 
     } 
    } 
} 
+1

我剛剛在ScheduledThreadPoolExecutor中發現了一個令人討厭的bug。如果一個工作線程正在等待一個只能在一個小時內執行的任務,並且您取消該任務,則該工作線程將繼續等待並且執行程序不會關閉。我提交了一個錯誤報告:http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=7069418 – Gili

+0

我更新了我的答案,並提供了一個bug#7069418的解決方法 – Gili

0

偉大的問題!看起來你可能會自己修補一個解決方案。

一個選項可能是用您自己的ScheduledExecutorService實現來包裝ScheduledThreadPoolExecutor。當需要關閉服務時,取消任何可以取消的任務,而是將它們發送到即時執行它們的服務。然後shutdown()那個服務。

下面是一些非常粗略的代碼,說明我的意思,雖然我警告你可能在這裏有缺陷,因爲它在幾分鐘內被掀起。特別是,我沒有花費很多努力來確保這是線程安全的。

class RunOnShutdownScheduledExecutorService extends AbstractExecutorService implements ScheduledExecutorService { 
    private final ScheduledExecutorService delegateService; 

    private Map<Future<?>, Runnable> scheduledFutures = 
      Collections.synchronizedMap(new IdentityHashMap<Future<?>, Runnable>()); 


    public RunOnShutdownScheduledExecutorService(ScheduledExecutorService delegateService) { 
     this.delegateService = delegateService; 
    } 

    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { 
     ScheduledFuture<?> future = delegateService.schedule(command, delay, unit); 
     scheduledFutures.put(future, command); 
     return future; 
    } 

    public void shutdown() { 
     delegateService.shutdown(); 
     ExecutorService immediateService = Executors.newFixedThreadPool(5); 
     for (Map.Entry<Future<?>, Runnable> entry : scheduledFutures.entrySet()) { 
      Future<?> future = entry.getKey(); 
      Runnable task = entry.getValue(); 
      if (!future.isDone()) { 
       if (future.cancel(false)) { 
        immediateService.submit(task); 
       } 
      } 
     } 
     immediateService.shutdown(); 
    } 

    //... 
} 
+0

另一種方法(由ScheduledThreadPoolExecutor的Javadoc討論)似乎是覆蓋decorateTask()。我相信這可能會導致更簡單的實施。 – Gili

+0

@Gili:看起來這是一種可能性。如果看起來可行,我建議你發佈一個自己問題的答案。你會建議還是委託給一個直接的執行者,或者你是否認爲你可以在關機時修改任務? –

+0

我不認爲你可以將'decorateTask()'與委託執行者混合在一起,但我會給它更多的思考。 – Gili

相關問題