2011-07-20 56 views
1

我將正在運行的生產者/使用者示例從線程/可運行轉換爲Executor/Callable/BlockingQueues並使用Poison Pill終止模式。即使所有線程完成後,應用程序仍會掛起幾分鐘

如果您運行下面的程序,即使每個線程都已完成,它仍會掛起幾分鐘。 jstack顯示在與應用程序看似不相關的隊列上阻塞了大量線程。

"pool-1-thread-10" prio=5 tid=10b08d000 nid=0x10d91c000 waiting on condition [10d91b000] 
    java.lang.Thread.State: TIMED_WAITING (parking) 
    at sun.misc.Unsafe.park(Native Method) 
    - parking to wait for <7f3113510> (a java.util.concurrent.SynchronousQueue$TransferStack) 
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198) 
    at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:424) 
    at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:323) 
    at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:874) 
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:945) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907) 
    at java.lang.Thread.run(Thread.java:680) 

我找不出爲什麼應用程序掛起。任何幫助表示讚賞。 謝謝

import java.util.ArrayList; 
import java.util.Collection; 
import java.util.List; 
import java.util.Random; 
import java.util.concurrent.Callable; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 
import java.util.concurrent.LinkedBlockingQueue; 

public class ProducersConsumers { 
    private LinkedBlockingQueue<Item> queue = new LinkedBlockingQueue<Item>(); 
    private static final ExecutorService executorPool = Executors.newCachedThreadPool(); 
    private Random randGenerator = new Random(System.currentTimeMillis()); 

    private class Item { 
     private boolean done = false; 
     private String message; 

     private Item(boolean done) { 
      this.done = done; 
     } 

     private Item(String message) { 
      this.message = message; 
     } 

     public boolean isDone() { 
      return done; 
     } 

     public String getMessage() { 
      return message; 
     } 
    } 

    private class Producer implements Callable<Long> { 
     private final int id; 
     private Integer numOfMessages; 

     private Producer(int id, int numOfMessages) { 
      this.id = id; 
      this.numOfMessages = numOfMessages; 
     } 

     @Override 
     public Long call() throws Exception { 
      long totalTime = 0; 
      while (numOfMessages > 0) { 
       String message; 
       synchronized (numOfMessages) { 
        long starttime = System.nanoTime(); 
        int msgLength = randGenerator.nextInt(20000); 
        StringBuilder sb = new StringBuilder(msgLength); 
        for (int a = 0; a < msgLength; a++) { 
         sb.append((char) ('a' + randGenerator.nextInt(26))); 
        } 
        message = sb.toString(); 
        long endtime = System.nanoTime(); 
        totalTime += endtime - starttime; 
       } 
       numOfMessages--; 
       queue.put(new Item(message)); 
      } 
      System.out.println("-------------Producer " + id + " is done."); 
      queue.put(new Item(true)); 
      return totalTime; 
     } 
    } 

    private class Consumer implements Callable<Long> { 
     private String monitor = "monitor"; 
     private final int id; 

     private Consumer(int id) { 
      this.id = id; 
     } 

     @Override 
     public Long call() throws Exception { 
      long totalTime = 0; 
      while (true) { 
       Item item = queue.take(); 
       if (item.isDone()) { 
        break; 
       } 
       synchronized (monitor) { 
        long starttime = System.nanoTime(); 
        StringBuilder sb = new StringBuilder(item.getMessage()); 
        sb = sb.reverse(); 
        String message = sb.toString(); 
        long endtime = System.nanoTime(); 
        totalTime += endtime - starttime; 
       } 
      } 
      System.out.println("+++++++++++++Consumer " + id + " is done."); 
      return totalTime; 
     } 
    } 

    public void begin(int threadCount) throws InterruptedException, ExecutionException { 
     Collection<Producer> producers = new ArrayList<Producer>(); 
     for (int i = 0; i < threadCount; i++) { 
      producers.add(new Producer(i, randGenerator.nextInt(5))); 
     } 
     Collection<Consumer> consumers = new ArrayList<Consumer>(); 
     for (int i = 0; i < threadCount; i++) { 
      consumers.add(new Consumer(i)); 
     } 
     try { 
      long starttime = System.nanoTime(); 
      List<Future<Long>> producerFutureList = executorPool.invokeAll(producers); 
      List<Future<Long>> consumerFutureList = executorPool.invokeAll(consumers); 
      long producerTotalTime = 0; 
      long consumerTotalTime = 0; 

      for (Future<Long> future : producerFutureList) { 
       producerTotalTime += future.get(); 
      } 
      for (Future<Long> future : consumerFutureList) { 
       consumerTotalTime += future.get(); 
      } 
      long mainThreadTotalTime = System.nanoTime() - starttime; 

      System.out.println("producerTotalTime " + producerTotalTime); 
      System.out.println("consumerTotalTime " + consumerTotalTime); 
      System.out.println("mainThreadTotalTime " + mainThreadTotalTime); 
      System.out.println("Difference   " + (producerTotalTime + consumerTotalTime - mainThreadTotalTime)); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. 
      throw e; 
     } catch (ExecutionException e) { 
      e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. 
      throw e; 
     } 

    } 

    public static void main(String[] args) throws ExecutionException, InterruptedException { 
     ProducersConsumers prodcon = new ProducersConsumers(); 
     prodcon.begin(20); 
    } 
} 

回答

4

當你完成它時你應該關閉ExecutorService。在程序結束時調用executorPool.shutdown()。

+0

ExecutorService的文檔沒有說明您完成後需要關閉,但這些示例都是這樣。 – ptomli

+0

就是這樣!調用shutdown()可以解決問題。非常感謝你。 – jabawaba

+0

@ptomli我知道你發表評論已經很長時間了。起初,我想到了同樣的事情。但是最近,我在[docs](http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ExecutorService.html)中找到了一些支持「shutdown」的東西。他們聲明:*一個未使用的ExecutorService應該關閉以允許其資源的回收。*因此,當你完成你的'ExecutorService'時,你應該總是調用'shutdown'。 –

0

您似乎正在使用共享資源,特別是在同步塊之外使用numOfMessages

while (numOfMessages > 0) { 
    // blah 
    synchronized (numOfMessages) { 
     // blah 
    } 
} 

我不認爲這是你的問題的原因,但它肯定是非線程安全的。這是一個典型的檢查後行爲場景。爲什麼這是Not Good(TM),請參閱Java Concurrency in PracticeEffective Java

+0

numOfMessages在此處不作爲同步監視器。正如你注意到的,numOfMessages是對象級別的,因此沒有人會在一個線程之外訪問它。然而,對於同步塊的原因是向JVM表明它不應當在塊內執行時中斷此線程,以便記錄準確的總執行時間。如果線程由於加載而換出,則記錄的處理時間將包括等待狀態階段,因此不準確。感謝您提出 – jabawaba

+0

或者您確定這是真的嗎?我寫了一個示例程序,似乎即使在同步塊內也有上下文切換。 – duduamar

+0

嗯。現在還不確定。我以爲我在很多年前瞭解到這一點,但在您提到它之後,我無法驗證它。有沒有阻止調度程序在關鍵塊中交換線程? – jabawaba

相關問題