2011-04-25 56 views
0

我有一個隊列與多個生產者 - 一個消費者。消費者定期運行並完全排空隊列(之後沒有消息)。 一個優雅的算法應該運行消費者,並等待它超時或只是等待消費者已經運行。優雅的方式與超時消費者優雅地停止排隊

目前我們有水木清華這樣的:

void stop(boolean graceful) { 
     if (graceful && !checkAndStopDirectly()) { 
      executor.shutdown(); 
      try { 
       if (!executor.awaitTermination(shutdownWaitInterval, shutdownWaitIntervalUnit)) { 
        log.warn("..."); 
       } 
      } catch (InterruptedException e) { 
       log.error("...", e); 
      } 
     } else { 
      executor.shutdownNow(); 
     } 

private boolean checkAndStopDirectly() { 
    ExecutorService shutdownExecutor = Executors.newSingleThreadExecutor(); 
    try { 
     return shutdownExecutor.submit(new Callable<Boolean>(){ 
      @Override 
      public Boolean call() throws Exception { 
       if (isAlreadyRan.compareAndSet(false, true)) { 
        try { 
         runnableDrainTask.run(); 
        } finally { 
         isAlreadyRan.set(false); 
        } 
        return true; 
       } else { 
        return false; 
       } 
      } 
     }).get(shutdownWaitInterval, shutdownWaitIntervalUnit); 

有誰看到更優雅的方式來做到這一點? 例如我正在尋找一種方式W/O使用額外的AtomicBoolean(isAlreadyRan)或雙等待邏輯與時間間隔作爲對象字段等 順便說一句,毒藥模式來到我的腦海裏...

+0

您能否更清楚您的要求?應該等什麼?一個客戶端到stop()調用?它應該等待什麼,隊列流失?你是否想要一個電話stop()來阻塞,直到這個東西被排空,不管是什麼? – Toby 2011-09-06 13:36:53

回答

1

你在這裏談論正常關閉你的應用程序應該做以下事情?

  1. 等待隊列排出,或
  2. 超時,如果排水時間太長

我可能需要了解您如何排排隊,但如果你能做到這一點以可中斷的方式,您可以在得到Future時超時,並嘗試shutdownNow(如果沒有完全耗盡則中斷),無論如何;這對你看起來如何?

ExecutorService pool = Executors.newSingleThreadExecutor(); 

public void stop() { 
    try { 
     pool.submit(new DrainTask()).get(100, MILLISECONDS); 
    } catch (TimeoutException e) { 
     // nada, the timeout indicates the queue hasn't drained yet 
    } catch (InterruptedException e) { 
     Thread.currentThread().interrupt(); 
    } catch (Exception e) { 
     // nada, defer to finally 
    } finally { 
     pool.shutdownNow(); 
    } 
} 

private class DrainTask implements Callable<Void> { 
    @Override 
    public Void call() throws Exception { 
     while (!Thread.currentThread().isInterrupted()) 
      ; // drain away 
     return null; 
    } 
} 

是用來防止多個併發調用stopcompareAndSet一部分?我想我喜歡一般鎖或者使用​​。但是,如果發生衝突,將會拋出ExecutionException,並重復撥打shutdownNow即可。

這種依賴於DrainTask能夠阻止它響應中斷調用的功能(因爲shutdownNow將嘗試在任何當前正在運行的線程上調用中斷)。