2013-10-16 34 views
1
的execute()

我正在尋找一個的ThreadPoolExecutor,當它的任務隊列已滿,將阻止 - 當前的Java實現拒絕新的任務,如果底層隊列已滿 -覆蓋中的ThreadPoolExecutor

public void execute(Runnable command) { 
    if (command == null) 
     throw new NullPointerException(); 
    /* 
    * Proceed in 3 steps: 
    * 
    * 1. If fewer than corePoolSize threads are running, try to 
    * start a new thread with the given command as its first 
    * task. The call to addWorker atomically checks runState and 
    * workerCount, and so prevents false alarms that would add 
    * threads when it shouldn't, by returning false. 
    * 
    * 2. If a task can be successfully queued, then we still need 
    * to double-check whether we should have added a thread 
    * (because existing ones died since last checking) or that 
    * the pool shut down since entry into this method. So we 
    * recheck state and if necessary roll back the enqueuing if 
    * stopped, or start a new thread if there are none. 
    * 
    * 3. If we cannot queue task, then we try to add a new 
    * thread. If it fails, we know we are shut down or saturated 
    * and so reject the task. 
    */ 
    int c = ctl.get(); 
    if (workerCountOf(c) < corePoolSize) { 
     if (addWorker(command, true)) 
      return; 
     c = ctl.get(); 
    } 
    if (isRunning(c) && workQueue.offer(command)) { 
     int recheck = ctl.get(); 
     if (! isRunning(recheck) && remove(command)) 
      reject(command); 
     else if (workerCountOf(recheck) == 0) 
      addWorker(null, false); 
    } 
    else if (!addWorker(command, false)) 
     reject(command); 
} 

會改變這種行:

if (isRunning(c) && workQueue.offer(command)) { 

TO

if (isRunning(c) && workQueue.put(command)) { 

取得成功?我錯過了什麼嗎?

SOLUTION(可能有助於下一人):

public class BlockingThreadPoolExecutor extends ThreadPoolExecutor { 

    private final Semaphore runLock; 

    public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
      long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, int maxTasks) { 
     super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); 
     runLock = new Semaphore(maxTasks); 
    } 

    @Override 
    protected void beforeExecute(Thread t, Runnable r) { 
     runLock.acquireUninterruptibly(); 
    } 

    @Override 
    protected void afterExecute(Runnable r, Throwable t) { 
     runLock.release(); 
    } 

} 
+0

當你嘗試時會發生什麼? – alterfox

+0

還沒有 - 如果出現明顯的死鎖,這很容易,但僅僅運行一兩個測試就很難診斷死鎖。 – Gandalf

回答

4

ThreadPoolExecutor狀態和設置要看,因爲不是所有的任務提交經過BlockingQueue。通常你只是想將ThreadPoolExecutorRejectedExecutionHandler更改爲ThreadPoolExecutor.CallerRunsPolicy,這將會提交提交。如果你真的想阻止提交,那麼你應該使用CompletionService,並在你想阻止時調用'take'方法。您也可以創建一個子類並使用Semaphore來阻止執行方法。有關更多信息,請參閱JDK-6648211 : Need for blocking ThreadPoolExecutor

+0

謝謝!我爲使用Semaphore的問題添加了一個解決方案 - 非常簡單。 – Gandalf