2017-10-05 95 views
0

當我運行下面的代碼:如何使用純ThreadPoolExecutor獲取MoreExecutors.newDirectExecutorService()行爲?

package foo.trials; 

import com.google.common.util.concurrent.MoreExecutors; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import java.util.Random; 
import java.util.concurrent.Callable; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Future; 
import java.util.concurrent.Semaphore; 
import java.util.concurrent.SynchronousQueue; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 

public class DirectExecutorService { 
    private static final Logger logger_ = LoggerFactory.getLogger(DirectExecutoService.class); 

    public static void main(String[] args) { 
     boolean useGuava = true; 

     final ExecutorService directExecutorService; 
     if (useGuava) { 
      directExecutorService = MoreExecutors.newDirectExecutorService(); 
     } else { 
      directExecutorService = new ThreadPoolExecutor(
        0, 1, 0, TimeUnit.DAYS, 
        new SynchronousQueue<Runnable>(), 
        new ThreadPoolExecutor.CallerRunsPolicy()); 
      directExecutorService.submit(new BlockingCallable()); 
     } 

     Future<Boolean> future = directExecutorService.submit(new MyCallable()); 
     try { 
      logger_.info("Result: {}", future.get()); 
     } catch (InterruptedException e) { 
      logger_.error("Unexpected: Interrupted!", e); 
     } catch (ExecutionException e) { 
      logger_.error("Unexpected: Execution exception!", e); 
     } 
     logger_.info("Exiting..."); 
    } 

    static class MyCallable implements Callable<Boolean> { 
     static final Random _random = new Random(); 
     @Override 
     public Boolean call() throws Exception { 
      logger_.info("In call()"); 
      return _random.nextBoolean(); 
     } 
    } 

    static class BlockingCallable implements Callable<Boolean> { 

     Semaphore semaphore = new Semaphore(0); 
     @Override 
     public Boolean call() throws Exception { 
      semaphore.acquire(); // this will never succeed. 
      return true; 
     } 
    } 
} 

我得到以下輸出

13:36:55.960 [main] INFO a.t.DirectExecutoService - In call() 
13:36:55.962 [main] INFO a.t.DirectExecutoService - Result: true 
13:36:55.963 [main] INFO a.t.DirectExecutoService - Exiting... 

注意,所有的執行發生在main線程。特別是可調用的調用get在調用線程中調度。當然,這是從MoreExecutors.newDirectExecutorService()預計那裏不會有什麼驚喜。

當我將變量useGuava設置爲false時,我得到了類似的結果。

13:45:14.264 [main] INFO a.t.DirectExecutoService - In call() 
13:45:14.267 [main] INFO a.t.DirectExecutoService - Result: true 
13:45:14.268 [main] INFO a.t.DirectExecutoService - Exiting... 

,如果我註釋掉以下行

directExecutorService.submit(new BlockingCallable()); 

然後我得到以下輸出。

13:37:27.355 [pool-1-thread-1] INFO a.t.DirectExecutoService - In call() 
13:37:27.357 [main] INFO a.t.DirectExecutoService - Result: false 
13:37:27.358 [main] INFO a.t.DirectExecutoService - Exiting... 

正如人們所看到的可調用的調用發生在不同的線程pool-1-thread-1。我想我可以解釋爲什麼會發生這種情況。也許因爲線程池可以有(最多)1個可用的線程,所以第一個Callable被分派到該額外線程,否則該線程被BlockingCallable消耗。

我的問題是,如何創建一個ExecutorService,將DirectExecutorService做什麼,而不必人爲地燃燒線程與永不完成的可調用?

爲什麼我問這個?

  1. 我有一個在版本11.0使用番石榴的代碼庫。我需要避免將其升級到17.0+ - 如果可以的話,它提供MoreExecutors.newDirectExecutorService()
  2. ThreadPoolExecutor不允許將maxThreads設置爲0.如果允許,那將會很奇怪,但如果確實如此,那也能解決我的問題。
  3. 最後,我很驚訝地發現這種行爲 - 我本以爲(錯誤地),使用CallerRunsPolicy會導致call所有Callable所有s到在調用者的線程中執行。所以,我想把我的經驗和竅門放在那裏,這樣別人就可以節省最後燃燒的時間來試圖理解這一點。 :(

有沒有更好/更地道的方式來實現DirectExecutorService樣的行爲如果不能升級到番石榴17.0+?

回答

2

有沒有更好/更地道的方式來實現DirectExecutorService樣的行爲,如果一個不能升級到番石榴17.0+?

如果這是你唯一的問題在這裏,你應該使用MoreExecutors.sameThreadExecutor()。它基本上newDirectExecutorService()之前,它被轉移到一個新的方法(和加入directExecutor()),見Javadoc

由於:18.0(目前爲MoreExecutors.sameThreadExecutor()自10.0)

BTW:你應該升級到最新的番石榴,你正在使用幾乎六歲的一個!

+0

就是這樣,謝謝!關於升級的觀點很好。 –

相關問題