2015-04-29 79 views
7

是否有一個ExecutorService實現行爲像一個具有以下特徵的線程池?執行器服務,擴展線程,然後排隊任務

  1. 總是有至少X個活動線程。
  2. 如果提交任務並且所有活動線程都忙,則會啓動一個新線程,最多爲Y個線程。
  3. 如果一個任務被提交併且所有的Y線程都忙,那麼這個任務就排隊了。
  4. 如果沒有提交新的任務,池會縮減回X個活動線程。

相當標準的線程池行爲。你會認爲ThreadPoolExecutor會處理這個問題,但如果超過10個任務被提交

executorService = new ThreadPoolExecutor(
    2, 10, // min/max threads 
    60, TimeUnit.SECONDS, // time of inactivity before scaling back 
    new SynchronousQueue<Runnable>()); // task queue 

會拋出異常。切換到LinkedBlockingQueue永遠不會超過這兩個最小線程,除非您限制像new LinkedBlockingQueue<Runnable>(20)這樣的大小,在這種情況下,將有兩個線程處理1-20個任務,2-10個線程處理21-30個任務,超過30個任務。不漂亮。與此同時,固定線程池永遠不會縮減非活動線程。

因此,爲了得到我所追求的,我可以使用其他種類的BlockingQueue還是擺弄我錯過的其他設置?是否還有另一個ExceutorService實現更適合(哪個?),還是通過覆蓋ThreadPoolExecutor的​​方法來滾動我自己?

+2

我不認爲一些不活躍的線程會做多大的危害? – JonasCz

+0

我也不理解你對非活動線程的關注。 Afaik,他們坐在那裏休息,幾乎沒有間接費用。 –

回答

4

不幸的是,答案是否定的。關於你可以做的最好的事情是有效地沒有螺紋地板,只有天花板。這可以通過allowing core threads to timeout完成。

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 60, TimeUnit.Seconds, new LinkedBlockingQueue<Runnable>()); 
tpe.allowCoreThreadTimeOut(true); 

因爲核心大小爲10,當任務被提交到10個線程是活動的一個新的線程將被啓動。之後,任務將在LinkedBlockingQueue中排隊。如果線程在60秒內處於非活動狀態,它將終止。

想要的行爲可以通過編寫一個實現BlockingQueue和RejectedExecutionHandler的類來實現,該類在確定任務應添加到隊列還是拒絕之前檢查ThreadPoolExecutors當前狀態。

+0

這正是我所需要的,儘管我使用的是有限大小的ArrayBlockingQueue。我想補充一點,你可以使用[ExecutorService.prestartAllCoreThreads()](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html#prestartAllCoreThreads())和調整如果您預計有一定的最小負載,您的空閒超時模仿「永遠在線」的核心線程行爲。 – MandisaW

0

其實SynchronousQueue具有零基本上是大小和原因在於,當您提交超過10個任務(最大池大小)在拒絕策略踢和任務被拒絕。同樣對於每個提交的任務,它都會獲得一個線程並使用它。

當您使用大小爲20的LinkedBlockingQueue時,如果已達到核心大小並且沒有空閒線程,它將對任務排隊。除非隊列已滿,否則它們將繼續排隊,在這種情況下,它將創建一個新線程用於新任務。當池大小達到最大時,拒絕策略會進入任何新的提交。

您希望每當提交任務並且核心(核心大小)中的所有線程都忙時,您的任務應該得到一個新線程,而不是在隊列中等待,我認爲這是不可能的。

+0

這絕不會使用2個以上的線程。 –

+0

@Brett你是對的我只是犯了一個錯誤。編輯答案。 –

1

這應該做的伎倆:

ThreadPoolExecutor tpe = new ThreadPoolExecutor(1, maxPoolSize, 60L, TimeUnit.SECONDS, 
      new SynchronousQueue<Runnable>()); 
    tpe.setRejectedExecutionHandler((
      Runnable r, 
      ThreadPoolExecutor executor) -> { 
     // this will block if the queue is full but not until the sun will rise in the west! 
      try { 
       if(!executor.getQueue().offer(r, 15, TimeUnit.MINUTES)){ 
        throw new RejectedExecutionException("Will not wait infinitely for offer runnable to ThreadPoolExecutor"); 
       } 
      } catch (InterruptedException e) { 
       throw new RejectedExecutionException("Unable to put runnable to queue", e); 
      } 
     });