2013-07-19 156 views
1

在java中,我試圖使用下面的代碼使用簡單的wait和notifyAll()方法編寫生產者和消費者實現。它運行幾秒鐘,稍後掛起。任何想法如何解決這個問題。如何解決生產者消費者中的死鎖

import java.util.ArrayDeque; 
import java.util.Queue; 

public class Prod_consumer { 
    static Queue<String> q = new ArrayDeque(10); 

    static class Producer implements Runnable { 
     public void run() { 
      while (true) { 
       if (q.size() == 10) { 
        synchronized (q) { 
         try { 
          System.out.println("Q is full so waiting"); 
          q.wait(); 
         } catch (InterruptedException ex) { 
          ex.printStackTrace(); 
         } 
        } 
       } 
       synchronized (q) { 
        String st = System.currentTimeMillis() + ""; 
        q.add(st); 
        q.notifyAll(); 
       } 
      } 
     } 

    } 

    static class Consumer implements Runnable { 
     public void run() { 
      while (true) { 
       if (q.isEmpty()) { 
        synchronized(q) { 
         try { 
          System.out.println("Q is empty so waiting "); 
          q.wait(); 
         }catch(InterruptedException ie) { 
          ie.printStackTrace(); 
         } 
        } 
       } 
       synchronized(q) { 
        System.out.println(q.remove()); 
        q.notifyAll(); 
       } 

      } 

     } 
    } 

    public static void main(String args[]) { 
     Thread consumer = new Thread(new Consumer()); 
     Thread consumer2 = new Thread(new Consumer()); 
     Thread producer = new Thread(new Producer()); 

     producer.start(); 
     consumer.start(); 
     consumer2.start(); 

    } 

} 

回答

2

Producer代碼看起來可疑。您希望等到隊列大小低於10,然後添加下一個元素。但是,使用當前的邏輯,您將等待通知(不管其原因),不要檢查隊列是否超出容量,然後釋放隊列中的鎖定。然後,您重新鎖定隊列並添加項目(不管另一個線程是否將某些內容放入隊列中)。

我建議這樣的代碼:

static class Producer implements Runnable { 
    public void run() { 
     while (true) { 
      synchronized (q) { 
       if (q.size() < 10) { 
        String st = System.currentTimeMillis() + ""; 
        q.add(st); 
        q.notifyAll(); 
       } else { 
        try { 
         System.out.println("Q is full so waiting"); 
         q.wait(); 
        } catch (InterruptedException ex) { 
         ex.printStackTrace(); 
        } 
       } 
      } 
     } 
    } 
} 

你必須與Consumer類類似的問題。我的建議是:

static class Consumer implements Runnable { 
    public void run() { 
     while (true) { 
      synchronized (q) { 
       if (q.isEmpty()) { 
        try { 
         System.out.println("Q is empty so waiting "); 
         q.wait(); 
        }catch(InterruptedException ie) { 
         ie.printStackTrace(); 
        } 
       } else { 
        System.out.println(q.remove()); 
        q.notifyAll(); 
       } 
      } 
     } 
    } 
} 

注意,在這兩種情況下,鎖被保持,檢查它是否是好的進行,該方法的實際業務之間的代碼。

+0

一個小問題,我想提出。通常當一個線程等待時,它會放棄它獲取的相應的監視器,直到有人通知。一旦它從等待()中醒來,線程將已經獲得監視器。然而,你的方法是放棄獲得的監視器(通過離開同步塊)並再次獲取,這仍然有效(從我可以識別的),但增加了額外的不必要的負擔。 –

+0

@AdrianShum - 你注意到這個問題是對的。如果有多個生產者線程或消費者線程,那麼離開'synchronized'塊會給每個線程一個使用隊列的機會。如果'while'塊被帶入'synchronized'塊中,那麼一旦生產者或消費者獲得了監控器,它就會保持它,直到隊列處於容量或清空狀態(如適用)。如果這是所需的行爲,那麼反轉嵌套順序將是適當的。 –

+0

即使在同步塊中移動循環,它也不會「在顯示隊列處於容量或清空狀態之前保持顯示器」。一旦進入wait(),監視器被釋放,其他線程仍然可以「使用隊列」。唯一的區別是釋放和重新獲取顯示器的開銷。 –

1

我可以看到在您當前的實施中存在很多問題。然而,你究竟在調查什麼以及你所關注的死鎖在哪裏?我相信這應該是你所做的。

最大的問題之一是同步的範圍簡直是錯誤的,並且它引起了很多競爭條件。

以您的消費者邏輯爲例,有可能在隊列中只有1個元素。兩個消費者線程都打到if (q.isEmpty()) {,都認爲它有一些東西讓它從隊列中獲得。然後雙方都會繼續運行並運行q.remove(),這對於第一個線程來說很好,但對於下一個線程將拋出異常。

競爭條件的另一個例子是,消費者檢查隊列可能是空的,但在它開始同步塊之前,生產者在隊列中放置10個項目,使其滿載,然後消費者進入同步塊wait()。由於notifyAll()之前完成,消費者將失去以前的notifyAll(),因爲隊列現在已滿,生產者線程不會把任何新項目的隊列,因爲它一直等待被某人所消耗的隊列。繁榮,死鎖

您的代碼中還存在其他問題(例如,不在迴路中包裝wait())。

我強烈建議你谷歌的生產者 - 消費者隊列的一些例子(我相信有噸),並嘗試瞭解什麼是應該做的方式。


對於@回覆評論TedHopp的評論:

@ TedHopp的方式將工作,但它是不必要的釋放和重新獲取隊列的監控。

通常它應該是看起來像:

static class Producer implements Runnable { 
    public void run() { 
     while (true) { // keep on adding item 
      String st = "" + System.currentTimeMillis(); // prepare the item 
      synchronized (q) { 
       while (q.size() >= 10) { // keep on waiting when the queue is full 
        try { 
         System.out.println("Q is full so waiting"); 
         q.wait(); 
        } catch (InterruptedException ex) { 
         ex.printStackTrace(); 
         // should be properly handled by rethrowing etc. 
        } 
       } 
       q.add(st); // add item to queue as it is not full at this moment 
       q.notifyAll(); 
      } 
     } 
    } 
} 

而且,通常我們希望創建一個生產者 - 消費者隊列類,而「增加」這樣的PC-隊列的方法將包含邏輯在上面的同步塊內。

以上方式不需要額外的發佈/顯示器重新獲取,看起來更接近我應該在PC隊列來實現。

0

你應該換行接觸隊列同步的任何代碼。 例如

if (q.size() == 10) { 
    synchronized (q) { 

if語句後評估隊列的大小可能會改變。 更好地切換訂單。

你有同樣的問題在您的消費者

1

共享存儲器上的每個操作都應該防範多線程訪問。由於隊列檢查的狀態不同步,所以當前實現存在死鎖。您應該能夠在Promela中輕鬆建立該代碼的模型以獲得死鎖場景。儘管如此,你應該意識到condition_variables(synchronized section)沒有計數語義,所以如果線程在達到等待狀態之前搶佔,而另一個同時調用notifyAll()函數,那麼它不適用於它之後的預先指定的線程重新獲得控制權。 解決的辦法很簡單:

... 
while (true) 
{ 
    synchronized (q) 
    { 
     if (q.size() == 10) 
... 
while (true) 
{ 
    synchronized(q) 
    { 
     if (q.isEmpty()) 
...