2010-02-11 12 views
22

我使用ExecutorService來簡化併發多線程程序。採取下面的代碼:ExecutorService,避免任務隊列變得太滿的標準方式

 

while(xxx) 
ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS); 
... 
Future<..> ... = exService.submit(..); 
... 
} 
 

在我的情況的問題是,提交()如果所有NUMBER_THREADS被佔用,不堵。結果是任務隊列被許多任務淹沒。這樣做的結果是,使用ExecutorService.shutdown()關閉執行服務需要很長時間(ExecutorService.isTerminated()將長時間爲false)。原因是任務隊列仍然很滿。

現在我的解決方法是用旗語合作,不允許有很多條目的ExecutorService的任務隊列中:

 

... 
Semaphore semaphore=new Semaphore(NUMBER_THREADS); 

while(xxx) 
ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS); 
... 
semaphore.aquire(); 
// internally the task calls a finish callback, which invokes semaphore.release() 
// -> now another task is added to queue 
Future<..> ... = exService.submit(..); 
... 
} 
 

我肯定有一個更好的多個封裝的解決方案?

回答

4

你最好自己創建ThreadPoolExecutor(這正是Executors.newXXX()的作用)。

在構造函數中,您可以傳入一個BlockingQueue以供Executor用作其任務隊列。如果你傳遞一個大小受限的BlockingQueue(如LinkedBlockingQueue),它應該達到你想要的效果。

ExecutorService exService = new ThreadPoolExecutor(NUMBER_THREADS, NUMBER_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(workQueueSize)); 
+0

我明白了。我在http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool%28int%29忽略了一個細節。在那裏提到,使用無界隊列作爲標準。 – 2010-02-11 21:37:03

+6

我試過了。不幸的是,您的解決方案不會阻止(如我所願),但會拋出RejectedExecutionException。還發現:http://www.velocityreviews.com/forums/t389526-threadpoolexecutor-with-blocking-execute.html。所提出的解決方法似乎更復雜,因爲我的信號例子,該死! – 2010-02-11 23:53:22

+3

由於RejectedExecutionException,如果隊列已滿,此工作不工作 – user249654 2012-05-21 12:51:51

5

您可以使用ThreadPoolExecutor.getQueue()。size()來查找等待隊列的大小。如果隊列太長,你可以採取行動。我建議在當前線程中運行任務,如果隊列太長會減慢生產者的速度(如果合適的話)

4

一個真正的阻塞ThreadPoolExecutor已經被許多人的願望清單所佔據,甚至還有一個打開的JDC錯誤。 我面臨同樣的問題,並遇到了這個: http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html

這是一個BlockingThreadPoolExecutor的實現,實現並採用使用提供給任務添加到隊列,等待隊列中有房間RejectionPolicy。看上去不錯。

+0

[This answer](http://stackoverflow.com/a/4522411/394431)給另一個問題建議使用一個自定義的'BlockingQueue'子類,通過委託給'put()'來阻塞'offer()'。我認爲最終的工作或多或少與這個'RejectedExecutionHandler'一樣。 – 2014-03-29 20:55:19

1

你可以添加另一個有限大小的bloquing隊列來控制executorService中的內部隊列的大小,有些人認爲像信號量但很容易。 執行器之前你把()和當任務存檔採取()。採取()必須是任務代碼

21

的技巧是使用一個固定的隊列大小和內部:

new ThreadPoolExecutor.CallerRunsPolicy() 

我還建議使用番石榴的ListeningExecutorService。 下面是一個示例消費者/生產者隊列。

private ListeningExecutorService producerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); 
private ListeningExecutorService consumerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); 

private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { 
    return new ThreadPoolExecutor(nThreads, nThreads, 
            5000L, TimeUnit.MILLISECONDS, 
            new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); 
} 

更好的是,你可能想考慮像RabbitMQ或ActiveMQ這樣的MQ,因爲他們有QoS技術。

+0

很好的答案,完整的例子。 – 2015-04-28 21:25:11