2012-10-19 65 views
3

考慮接受需要很長時間初始化的服務(例如,JDBC連接的參數)的服務的配置設置的用戶界面。我們希望我們的用戶界面在服務初始化發生時保持響應。如果用戶進行其他更改,則應使用新參數取消初始化並重新啓動。ExecutorService在提交新任務時取消當前任務

由於參數在用戶鍵入每個字符時被組合在配置中,因此可能會在一行中創建大量初始化請求。只有最後一個應該執行。

我們已經放在一起的代碼實現了這個結果,但是看起來這種行爲似乎是實現ExecutorService的非常好的候選者。在我們將所有事情重構爲ExecutorService之前,我想我會詢問世界上是否有類似的實現。

更具體:

的ExecutorService的將有一個工作線程。一旦提交新任務,當前任務就會被取消(並且工作人員中斷)。然後爲下一次執行捕獲新任務。如果提交了另一個任務,則當前任務再次被取消,並且「下一次執行」任務被設置爲該新任務。當工作線程最終拿起下一個執行任務時,它將始終是提交的最後一個任務 - 所有其他任務都被取消或丟棄。

有沒有人有像他們願意分享的實現?或者是否有一個涵蓋此類行爲的標準庫?實現起來並不難,但要確定線程的安全性可能會非常棘手,所以我寧願使用經過驗證的代碼(如果可以的話)。

回答

3

這裏就是我最終想出了 - 我感興趣的任何意見:

public class InterruptingExecutorService extends ThreadPoolExecutor{ 
    private volatile FutureTask<?> currentFuture; 

    public InterruptingExecutorService(boolean daemon) { 
     super(0, 1, 1000L, TimeUnit.MILLISECONDS, 
       new LinkedBlockingQueue<Runnable>(), 
       daemon ? new DaemonThreadFactory() : Executors.defaultThreadFactory()); 

    } 

    public static class DaemonThreadFactory implements ThreadFactory{ 
     ThreadFactory delegate = Executors.defaultThreadFactory(); 

     @Override 
     public Thread newThread(Runnable r) { 
      Thread t = delegate.newThread(r); 
      t.setDaemon(true); 
      return t; 
     } 

    } 

    private void cancelCurrentFuture(){ 
     // cancel all pending tasks 
     Iterator<Runnable> it = getQueue().iterator(); 
     while(it.hasNext()){ 
      FutureTask<?> task = (FutureTask<?>)it.next(); 
      task.cancel(true); 
      it.remove(); 
     } 

     // cancel the current task 
     FutureTask<?> currentFuture = this.currentFuture; 
     if(currentFuture != null){ 
      currentFuture.cancel(true); 
     } 
    } 

    @Override 
    public void execute(Runnable command) { 
     if (command == null) throw new NullPointerException(); 

     cancelCurrentFuture(); 
     if (!(command instanceof FutureTask)){ // we have to be able to cancel a task, so we have to wrap any non Future 
      command = newTaskFor(command, null); 
     } 
     super.execute(command); 
    } 

    @Override 
    protected void beforeExecute(Thread t, Runnable r) { 
     // it is safe to access currentFuture like this b/c we have limited the # of worker threads to only 1 
     // it isn't possible for currentFuture to be set by any other thread than the one calling this method 
     this.currentFuture = (FutureTask<?>)r; 
    } 

    @Override 
    protected void afterExecute(Runnable r, Throwable t) { 
     // it is safe to access currentFuture like this b/c we have limited the # of worker threads to only 1 
     // it isn't possible for currentFuture to be set by any other thread than the one calling this method 
     this.currentFuture = null; 
    } 
} 
+0

很酷的代碼,以更好的方式製作了中斷者。從getQueue()的項目可以投到FutureTask是問題的關鍵 – farmer1992

+0

有趣的是,我發現execute()方法得到了該演員的方式。所以我最後得到了一個覆蓋執行的不同實現,檢查FutureTask是否被傳入,如果沒有,則包裝它。最終的代碼實際上是更清潔的,並且無論ExecutorService如何使用都能正常工作 - 如果我能記得的話,我會在我的代碼得到幫助時發佈更新。最終結果,BTW,非常光滑 - 各種各樣的用例。 –

+0

ok - 更新了代碼 –

1

你可能需要一個DiscardOldestPolicy添加到您的遺囑執行人

http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.DiscardOldestPolicy.html

You will get 
0 submitted 
1 submitted 
2 submitted 
3 submitted 
4 submitted 
5 submitted 
6 submitted 
7 submitted 
8 submitted 
9 submitted 
9 finished 

public static void main(String[] args) throws SecurityException, 
     NoSuchMethodException { 

    final Method interruptWorkers = ThreadPoolExecutor.class 
      .getDeclaredMethod("interruptWorkers"); 
    interruptWorkers.setAccessible(true); 

    ExecutorService executor = new ThreadPoolExecutor(1, 1, 0L, 
      TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), 
      new RejectedExecutionHandler() { 

       @Override 
       public void rejectedExecution(Runnable r, 
         ThreadPoolExecutor executor) { 
        if (!executor.isShutdown()) { 
         try { 

          interruptWorkers.invoke(executor); 
          executor.execute(r); 

         } catch (IllegalArgumentException e) { 
          e.printStackTrace(); 
         } catch (IllegalAccessException e) { 
          e.printStackTrace(); 
         } catch (InvocationTargetException e) { 
          e.printStackTrace(); 
         } 
        } 
       } 
      }); 


    for(int i =0 ;i<10;i++) 
     executor.submit(newTask(i)); 
} 

private static Runnable newTask(final int id) { 
    return new Runnable() { 
     { 

      System.out.println(id + " submitted"); 
     } 

     @Override 
     public void run() { 
      try { 

       Thread.sleep(5000l); 
       System.out.println(id + " finished"); 
      } catch (InterruptedException e) { 
      } 

     } 

    }; 
} 
+0

啊 - 很優雅...不得不打中斷工作者的方法有點令人不安,但這確實看起來像是最好的方法。 –

+0

ok - interruptWorkers不是我正在運行的JDK的一部分(Oracle,1.6.0_34),所以這不起作用。關於一種可以使用公共API的方法的其他想法? –

+0

我使用openjdk 1.6.0_24並從源代碼獲得 – farmer1992