2012-08-06 191 views
0

我有一個非常基本的線程池代碼。 它調用一個位於linkedblockingqueue內的工作對象池。 該代碼通過重新循環工作對象來輸出輸入數據。LinkedBlockingQueue中的死鎖(?)

我找到一致的僵局/有以下凍結:

public class throttleheapthreadpool{ 
      private quoteworkerobject[] channels; 
      private LinkedBlockingQueue<quoteworkerobject> idlechannels; 
      public throttleheapthreadpool(int poolsize,int stocks){ 
     channels=new quoteworkerobject[poolsize]; 
     idlechannels=new LinkedBlockingQueue<quoteworkerobject>(); 

     for(int i=1;i<poolsize;i++){ 
      channels[i]=new quoteworkerobject(idlechannels); 
      idlechannels.add(channels[i]);//All WORKERS to Idle pool to start 
     } 
    } 

    public void execute(Integer quote){ 
     quoteworkerobject current = null; 
     try { 
         //extract worker from pool 
      current = (quoteworkerobject)idlechannels.take(); 
      current.put(quote); 
     } catch (InterruptedException e) { 
     } 
    } 

    class quoteworkerobject{ 
     LinkedBlockingQueue<Integer> taskqueue=new LinkedBlockingQueue<Integer>(); 
     Thread quotethread=null; 
     LinkedBlockingQueue<quoteworkerobject> idle=null; 
     @SuppressWarnings("unchecked") 
     public quoteworkerobject(LinkedBlockingQueue<quoteworkerobject> idlechannels){ 
      this.idle=idlechannels; 
      Runnable r=new Runnable(){ 
       public void run() { 
        insertquote(); 
       } 
      }; 
      quotethread=new Thread(r); 
      quotethread.start();//spawn a thread from the worker 
     } 
     public void put(Integer quote){ 
      taskqueue.add(quote); 
     } 
     public void insertquote(){ 
      try{ 
       Integer thisquote=taskqueue.take(); 
       idle.add(this); 
      } 
      catch(Exception ex){ 
      } 

     } 

    } 

    public static void main(String[] args){ 
     throttleheapthreadpool pool=new throttleheapthreadpool(5,200); 
     Random randomGenerator = new Random(); 

     for(int node=0;node < 20;node++){ 
      int d=randomGenerator.nextInt(5*200); 
      pool.execute(d); 
     } 
    } 

} 

此代碼一致凍結8日執行 - 在點 電流=(quoteworkerobject)idlechannels.take();

上述有什麼問題?

+0

是否將池的大小從5更改爲20有幫助? – scientiaesthete 2012-08-06 17:13:49

+0

@RanadhirNag投票或接受你喜歡的答案。 ;) – Eugene 2012-08-07 13:40:36

回答

1

看下面我的重構(仍然不完美,但稍微可讀)。你的問題如下:

  • 創建4名工人(循環從1到5的throttleheapthreadpool構造)
  • 每個工人運行insertquote一次在一個單獨的線程,並返回到空閒池

所以,總體來說,您提交的4個作業,其得到完成,工人回到隊列,然後給他們額外的4個職位(這就是共8個),只是因爲他們不消耗工作的insertquote方法已退出。

解決方法:運行insertquote在while循環:

public void insertquote() { 
    try { 
     while (true) { 
      taskqueue.take(); 
      idle.add(this); 
     } 
    } catch (Exception ex) { 
    } 
} 

的信息,這裏是我當前版本的代碼:

public class ThrottleHeapThreadPool { 

    private final BlockingQueue<QuoteWorkerObject> idlechannels = new LinkedBlockingQueue<QuoteWorkerObject>(); 

    public static void main(String[] args) { 
     ThrottleHeapThreadPool pool = new ThrottleHeapThreadPool(5, 200); 
     Random randomGenerator = new Random(); 

     for (int node = 0; node < 20; node++) { 
      int d = randomGenerator.nextInt(5 * 200); 
      pool.execute(d); 
     } 
    } 

    public ThrottleHeapThreadPool(int poolsize, int stocks) { 

     for (int i = 1; i < poolsize; i++) { 
      QuoteWorkerObject worker = new QuoteWorkerObject(idlechannels); 
      idlechannels.add(worker);//All WORKERS to Idle pool to start 
      worker.init(); 
     } 
    } 

    public void execute(Integer quote) { 
     try { 
      //extract worker from pool 
      QuoteWorkerObject worker = idlechannels.take(); 
      worker.put(quote); 
     } catch (InterruptedException e) { 
     } 
    } 

    class QuoteWorkerObject { 

     private final BlockingQueue<Integer> taskqueue = new LinkedBlockingQueue<Integer>(); 
     private final BlockingQueue<QuoteWorkerObject> idle; 

     @SuppressWarnings("unchecked") 
     public QuoteWorkerObject(BlockingQueue<QuoteWorkerObject> idlechannels) { 
      this.idle = idlechannels; 
     } 

     public void init() { 
      new Thread(new Runnable() { 
       public void run() { 
        insertquote(); 
       } 
      }).start(); 
     } 

     public void put(Integer quote) { 
      taskqueue.add(quote); 
     } 

     public void insertquote() { 
      try { 
       while (true) { 
        taskqueue.take(); 
        idle.add(this); 
       } 
      } catch (Exception ex) { 
      } 
     } 
    } 
} 
2

這就是爲什麼我(?恨)不喜歡使用這樣的代碼。你應該考慮讓你/我們的生活更輕鬆,並且編寫代碼,甚至在幾個月後你可以看看並且證明它:相應地給你的變量命名,寫簡短的文檔或者解釋等等。我花了25分鐘來重構,因爲我無法理解正在發生的事情。

我添加了一個小的重構,並且我還添加了一些斷點,查看代碼 - 解釋在裏面。但問題出在insertQuote方法中 - 它完成得太早。

import java.util.Random; 
import java.util.concurrent.LinkedBlockingQueue; 

public class Pool { 
private Worker[] workers; 
private LinkedBlockingQueue<Worker> workerQueue; 



/** 
* Create a pool of 5 workers and a {@link LinkedBlockingQueue} to store them 
*/ 
public Pool(int poolsize) { 
    //1. First you get here : you create a Pool of 5 Worker Threads and a Queue to store them 
    System.out.println("1."); 
    workers = new Worker[poolsize]; 
    workerQueue = new LinkedBlockingQueue<Worker>(); 

    for (int i = 0; i < poolsize; i++) { 
     //2. You instantiate 5 worker Threads and place each of them on the Queue 
     System.out.println("2."); 
     workers[i] = new Worker(workerQueue); 
     workerQueue.add(workers[i]); 
    } 
} 

public void execute(Integer quote) { 
    Worker current = null; 
    try { 
     // extract worker from pool 
     //6. Get a worker from the Queue 
     System.out.println("6."); 
     current = workerQueue.take(); 
     current.put(quote); 
    } catch (InterruptedException e) { 
    } 
} 


/** 
* 
* 
*/ 
class Worker { 
    LinkedBlockingQueue<Integer> taskqueueForEachWorker = new LinkedBlockingQueue<Integer>(); 
    LinkedBlockingQueue<Worker> workerQueue = null; 

    public Worker(LinkedBlockingQueue<Worker> idlechannels) { 
     new Thread(new Runnable() { 
      @Override 
      public void run() { 
       //3. You call the insert quote method 
       System.out.println("3."); 
       insertquote(); 
      } 
     }).start(); 
    } 

    public void put(Integer quote) { 
     //7. Add a task for each Thread to do 
     System.out.println("7."); 
     taskqueueForEachWorker.add(quote); 
    } 

    //TODO The problem is here: After you execute this line : workerQueue.add(this); this method ends, NO MORE worker Threads are put on the queue, 
    // thus at point 6 you block, well because there are no more worker Threads an no one add them. 

    public void insertquote() { 
     try { 
      // 4. You try to take an Integer from the Pool of tasks from rach Thread, but there is nothing yet - it is empty, thus each Thread (worker) 
      // blocks here, waiting for a task 
      System.out.println("4."); 
      Integer thisquote = taskqueueForEachWorker.take(); // This will successed only after 7. 
      workerQueue.add(this); 
     } catch (Exception ex) { 
     } 
    } 
} 

public static void main(String[] args) { 
    Pool pool = new Pool(5); 
    Random randomGenerator = new Random(); 

    for (int node = 0; node < 20; node++) { 
     int d = randomGenerator.nextInt(5 * 200); 
     System.out.println("5."); 
     pool.execute(d); 
    } 
} 

}

輸出將是1。 2. 3. 4. 2. 3. 4. 2. 3. 4. 2。 3. 4. 2. 3. 4. 5. 6. 7. 5. 6. 7. 5. 6. 7. 5. 6. 7. 5. 6. 7. 5。 6.

看到最後一行是6.如果因爲insertQuote方法已退出而出現塊,因此隊列現在爲空,則所有工作線程都已被佔用。

另外,在我看來,因爲你的工作線程每個都使用一個分離的隊列,你應該實施「工作被盜」模式,或Deque。也看看那個。