2017-07-29 57 views
3

當在任何ScheduledExecutorService以固定費率計劃的任何命令時,它返回可以取消的ScheduledFuture。 但是,「取消」並不能保證命令在取消返回後仍不能執行,例如因爲當調用「cancell」時命令已經處於執行的中間。如何取消ShceduledFuture並等待可運行停止,如果在取消的時刻正在運行可執行文件?

對於大多使用情況下,有足夠的功能。但是在取消之後需要阻止當前線程時,我已經處理了用例,如果命令已在進行中,並等待命令完成。換句話說,如果命令仍在執行,調用cancel的線程不應該繼續前進。用mayInterruptIfRunning = true取消也不合適,因爲我不想破壞當前執行,我只需要等待正常完成。

我沒有找到如何實現通過標準JDK類此要求。 問題1:我錯了,這種功能是否存在?

所以我決定自行實施它。 進口的java.util.concurrent *;

public class GracefullyStoppingScheduledFutureDecorator implements ScheduledFuture { 

/** 
* @return the scheduled future with method special implementation of "cancel" method, 
* which in additional to standard implementation, 
* provides strongly guarantee that command is not in the middle of progress when "cancel" returns 
*/ 
public static ScheduledFuture schedule(Runnable command, long initialDelay, long period, TimeUnit unit, ScheduledExecutorService scheduler) { 
    CancellableCommand cancellableCommand = new CancellableCommand(command); 
    ScheduledFuture future = scheduler.scheduleAtFixedRate(cancellableCommand, initialDelay, period, unit); 
    return new GracefullyStoppingScheduledFutureDecorator(future, cancellableCommand); 
} 

private GracefullyStoppingScheduledFutureDecorator(ScheduledFuture targetFuture, CancellableCommand command) { 
    this.targetFuture = targetFuture; 
    this.runnable = command; 
} 

private final ScheduledFuture targetFuture; 
private final CancellableCommand runnable; 

@Override 
public boolean cancel(boolean mayInterruptIfRunning) { 
    runnable.cancel(); 
    return targetFuture.cancel(mayInterruptIfRunning); 
} 

@Override 
public long getDelay(TimeUnit unit) { 
    return targetFuture.getDelay(unit); 
} 

@Override 
public int compareTo(Delayed o) { 
    return targetFuture.compareTo(o); 
} 

@Override 
public boolean isCancelled() { 
    return targetFuture.isCancelled(); 
} 

@Override 
public boolean isDone() { 
    return targetFuture.isDone(); 
} 

@Override 
public Object get() throws InterruptedException, ExecutionException { 
    return targetFuture.get(); 
} 

@Override 
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { 
    return targetFuture.get(timeout, unit); 
} 

private static class CancellableCommand implements Runnable { 

    private final Object monitor = new Object(); 
    private final Runnable target; 
    private boolean cancelled = false; 

    private CancellableCommand(Runnable target) { 
     this.target = target; 
    } 

     public void cancel() { 
      synchronized (monitor) { 
       cancelled = true; 
      } 
     } 

     @Override 
     public void run() { 
      synchronized (monitor) { 
       if (!cancelled) { 
        target.run(); 
       } 
      } 
     } 

    } 

} 

問題2:可能有人發現在上面的代碼中的錯誤?

回答

2

問題2:可能有人發現在上面的代碼中的錯誤?

有可以通過以下場景再現假設的僵局:

  1. 有線程T1持有監視器M1
  2. 計劃任務執行(持有其顯示器M2)上線T2和希望進入M1,所以T2需要等到T1退出監控器M1。
  3. T1決定取消任務,但由於其顯示器M2由任務本身鎖定我們的僵局。

最可能的情況abovr是虛幻的,但是從所有可能的情況,以保護,我決定在無鎖的方式重寫代碼:

public class GracefullyStoppingScheduledFuture { 

/** 
* @return the scheduled future with method special implementation of "cancel" method, 
* which in additional to standard implementation, 
* provides strongly guarantee that command is not in the middle of progress when "cancel" returns 
*/ 
public static GracefullyStoppingScheduledFuture cheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit, ScheduledExecutorService scheduler) { 
    CancellableCommand cancellableCommand = new CancellableCommand(command); 
    ScheduledFuture future = scheduler.scheduleAtFixedRate(cancellableCommand, initialDelay, period, unit); 
    return new GracefullyStoppingScheduledFuture(future, cancellableCommand); 
} 

private GracefullyStoppingScheduledFuture(ScheduledFuture targetFuture, CancellableCommand command) { 
    this.targetFuture = targetFuture; 
    this.runnable = command; 
} 

private final ScheduledFuture targetFuture; 
private final CancellableCommand runnable; 

public void cancelAndBeSureOfTermination(boolean mayInterruptIfRunning) throws InterruptedException, ExecutionException { 
    try { 
     targetFuture.cancel(mayInterruptIfRunning); 
    } finally { 
     runnable.cancel(); 
    } 
} 

private static class CancellableCommand implements Runnable { 

    private static final int NOT_EXECUTING = 0; 
    private static final int IN_PROGRESS = 1; 
    private static final int CANCELLED_WITHOUT_OBSTRUCTION = 2; 
    private static final int CANCELLED_IN_MIDDLE_OF_PROGRESS = 3; 

    private final AtomicInteger state = new AtomicInteger(NOT_EXECUTING); 
    private final AtomicReference<Thread> executionThread = new AtomicReference<>(); 
    private final CompletableFuture<Void> cancellationFuture = new CompletableFuture<>(); 
    private final Runnable target; 

    private CancellableCommand(Runnable target) { 
     this.target = target; 
    } 

    public void cancel() throws ExecutionException, InterruptedException { 
     if (executionThread.get() == Thread.currentThread()) { 
      // cancel method was called from target by itself 
      state.set(CANCELLED_IN_MIDDLE_OF_PROGRESS); 
      return; 
     } 
     while (true) { 
      if (state.get() == CANCELLED_WITHOUT_OBSTRUCTION) { 
       return; 
      } 
      if (state.get() == CANCELLED_IN_MIDDLE_OF_PROGRESS) { 
       cancellationFuture.get(); 
       return; 
      } 
      if (state.compareAndSet(NOT_EXECUTING, CANCELLED_WITHOUT_OBSTRUCTION)) { 
       return; 
      } 
      if (state.compareAndSet(IN_PROGRESS, CANCELLED_IN_MIDDLE_OF_PROGRESS)) { 
       cancellationFuture.get(); 
       return; 
      } 
     } 
    } 

    @Override 
    public void run() { 
     if (!state.compareAndSet(NOT_EXECUTING, IN_PROGRESS)) { 
      notifyWaiters(); 
      return; 
     } 

     try { 
      executionThread.set(Thread.currentThread()); 
      target.run(); 
     } finally { 
      executionThread.set(null); 
      if (!state.compareAndSet(IN_PROGRESS, NOT_EXECUTING)) { 
       notifyWaiters(); 
      } 
     } 
    } 

    private void notifyWaiters() { 
     if (state.get() == CANCELLED_WITHOUT_OBSTRUCTION) { 
      // no need to notify anything 
      return; 
     } 
     // someone waits for cancelling 
     cancellationFuture.complete(null); 
     return; 
    } 

} 
相關問題