我將正在運行的生產者/使用者示例從線程/可運行轉換爲Executor/Callable/BlockingQueues並使用Poison Pill終止模式。即使所有線程完成後,應用程序仍會掛起幾分鐘
如果您運行下面的程序,即使每個線程都已完成,它仍會掛起幾分鐘。 jstack顯示在與應用程序看似不相關的隊列上阻塞了大量線程。
"pool-1-thread-10" prio=5 tid=10b08d000 nid=0x10d91c000 waiting on condition [10d91b000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <7f3113510> (a java.util.concurrent.SynchronousQueue$TransferStack)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:424)
at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:323)
at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:874)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:945)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
at java.lang.Thread.run(Thread.java:680)
我找不出爲什麼應用程序掛起。任何幫助表示讚賞。 謝謝
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
public class ProducersConsumers {
private LinkedBlockingQueue<Item> queue = new LinkedBlockingQueue<Item>();
private static final ExecutorService executorPool = Executors.newCachedThreadPool();
private Random randGenerator = new Random(System.currentTimeMillis());
private class Item {
private boolean done = false;
private String message;
private Item(boolean done) {
this.done = done;
}
private Item(String message) {
this.message = message;
}
public boolean isDone() {
return done;
}
public String getMessage() {
return message;
}
}
private class Producer implements Callable<Long> {
private final int id;
private Integer numOfMessages;
private Producer(int id, int numOfMessages) {
this.id = id;
this.numOfMessages = numOfMessages;
}
@Override
public Long call() throws Exception {
long totalTime = 0;
while (numOfMessages > 0) {
String message;
synchronized (numOfMessages) {
long starttime = System.nanoTime();
int msgLength = randGenerator.nextInt(20000);
StringBuilder sb = new StringBuilder(msgLength);
for (int a = 0; a < msgLength; a++) {
sb.append((char) ('a' + randGenerator.nextInt(26)));
}
message = sb.toString();
long endtime = System.nanoTime();
totalTime += endtime - starttime;
}
numOfMessages--;
queue.put(new Item(message));
}
System.out.println("-------------Producer " + id + " is done.");
queue.put(new Item(true));
return totalTime;
}
}
private class Consumer implements Callable<Long> {
private String monitor = "monitor";
private final int id;
private Consumer(int id) {
this.id = id;
}
@Override
public Long call() throws Exception {
long totalTime = 0;
while (true) {
Item item = queue.take();
if (item.isDone()) {
break;
}
synchronized (monitor) {
long starttime = System.nanoTime();
StringBuilder sb = new StringBuilder(item.getMessage());
sb = sb.reverse();
String message = sb.toString();
long endtime = System.nanoTime();
totalTime += endtime - starttime;
}
}
System.out.println("+++++++++++++Consumer " + id + " is done.");
return totalTime;
}
}
public void begin(int threadCount) throws InterruptedException, ExecutionException {
Collection<Producer> producers = new ArrayList<Producer>();
for (int i = 0; i < threadCount; i++) {
producers.add(new Producer(i, randGenerator.nextInt(5)));
}
Collection<Consumer> consumers = new ArrayList<Consumer>();
for (int i = 0; i < threadCount; i++) {
consumers.add(new Consumer(i));
}
try {
long starttime = System.nanoTime();
List<Future<Long>> producerFutureList = executorPool.invokeAll(producers);
List<Future<Long>> consumerFutureList = executorPool.invokeAll(consumers);
long producerTotalTime = 0;
long consumerTotalTime = 0;
for (Future<Long> future : producerFutureList) {
producerTotalTime += future.get();
}
for (Future<Long> future : consumerFutureList) {
consumerTotalTime += future.get();
}
long mainThreadTotalTime = System.nanoTime() - starttime;
System.out.println("producerTotalTime " + producerTotalTime);
System.out.println("consumerTotalTime " + consumerTotalTime);
System.out.println("mainThreadTotalTime " + mainThreadTotalTime);
System.out.println("Difference " + (producerTotalTime + consumerTotalTime - mainThreadTotalTime));
} catch (InterruptedException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
throw e;
} catch (ExecutionException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
throw e;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ProducersConsumers prodcon = new ProducersConsumers();
prodcon.begin(20);
}
}
ExecutorService的文檔沒有說明您完成後需要關閉,但這些示例都是這樣。 – ptomli
就是這樣!調用shutdown()可以解決問題。非常感謝你。 – jabawaba
@ptomli我知道你發表評論已經很長時間了。起初,我想到了同樣的事情。但是最近,我在[docs](http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ExecutorService.html)中找到了一些支持「shutdown」的東西。他們聲明:*一個未使用的ExecutorService應該關閉以允許其資源的回收。*因此,當你完成你的'ExecutorService'時,你應該總是調用'shutdown'。 –