2013-11-05 23 views
0

我在多線程中很天真,並且正在嘗試學習它的概念。這是我生產者 - 消費者問題的實現。請看看,並建議我是否不正確/原油/任何其他建議,可以改善我的設計。對我實施生產者消費者的建議

static int data = 0; 
static Object obj1 = new Object(); 

static class Producer implements Runnable { 

    public void run() { 
     produce(); 
    } 

    void produce() { 
     while (true) { 

       if (data < 5){ 
        synchronized(obj1){ 
        System.out.println("Producing Data. Now Data is "+data++); 
        obj1.notifyAll(); 
        } 
        try { 
         Thread.sleep(1000); 
        } catch (InterruptedException e) { 
         // TODO Auto-generated catch block 
         e.printStackTrace(); 
        } 
       } 
       else{ 
        try { 
         System.out.println("Producer inactive"); 
         synchronized(obj1){ 
         obj1.wait(); 
         } 
         System.out.println("Producer active"); 
        } catch (InterruptedException e) { 
         // TODO Auto-generated catch block 
         e.printStackTrace(); 
        } 
       } 

     } 
    } 
} 

    static class Consumer implements Runnable{ 

     public void run(){ 
      consume(); 
     } 
     void consume() { 
      while (true) { 

        if (data > 0){ 
         synchronized(obj1){ 
         System.out.println("Consuming Data. Now Data is "+data--); 
         obj1.notifyAll(); 
         } 
         try { 
          Thread.sleep(1000); 
         } catch (InterruptedException e) { 
          // TODO Auto-generated catch block 
          e.printStackTrace(); 
         } 
        } 
        else{ 
         try { 
          System.out.println("Consumer Inactive"); 
          synchronized(obj1){ 
          obj1.wait(); 
          } 
          System.out.println("Consumer Active"); 

         } catch (InterruptedException e) { 
          // TODO Auto-generated catch block 
          e.printStackTrace(); 
         } 
        } 

      } 
     } 
    } 
+2

使用'BlockingQueue'。 –

+0

你在生產和消費什麼?你的變量'data'的值?你的練習的重點是什麼? –

+0

是的。我只是使用數據變量進行操作。 – Bhaskar

回答

1

好的幾點。生產者和消費者通常共享數據結構。靜態數據的使用非常奇怪,坦率地說沒有任何意義。通常你想分享的是一個數據結構,就像生產者和消費者之間的隊列。生產者將把事情添加到隊列的尾部,並且消費者將從隊列的頭部抽取事物(FIFO - 先進先出)。現在我沒有看到這些,所以它產生的是什麼與消費?

一個好的生產者消費者體系結構不會太在意交換什麼類型的數據,所以你可以通過許多不同類型的東西。這就是面向對象的命令體系結構可以幫助你的地方。在這個例子中,SomeMessage表示一些對象層次結構的根,因此可以交換各種消息。

下面是你應該如何實例化一個生產者 - 消費者結構程序中的一個簡單的例子:

public class SomeClient { 
    public void start() { 
     Queue sharedQueue = new LinkedList(); 

     producer = new Producer(sharedQueue); 
     consumer = new Consumer(sharedQueue); 

     producer.start(); 
     consumer.start(); 
    } 
} 

這裏是該實現:

public class Producer implements Runnable { 
    Thread thread; 
    Queue queue; 

    public Producer(Queue queue) { 
     this.queue = queue; 
    } 

    public void start() { 
     thread = new Thread(this); 
     thread.start(); 
    } 

    public void shutdown() { 
     thread.interrupt(); // request a shutdown 
     thread.join();  // make sure we wait until Producer.thread exits before this thread continues 
    } 

    public void run() { 
     try { 
      while(!Thread.isInterrupted()) { 
       SomeMessage message = produceAMessage(); 
       synchronized(queue) { 
        queue.add(message); 
        queue.notifyAll(); 
       } 
      } 
     } catch(InterruptedException ex) { 
      System.out.println("Producer shutting down per request."); 
     } finally { 
      thread = null; 
     } 
    } 
} 

public class Consumer implements Runnable { 
    Thread thread; 
    Queue queue; 

    public Consumer(Queue queue) { 
     this.queue = queue; 
    } 

    public void start() { 
     thread = new Thread(this); 
     thread.start(); 
    } 

    public void shutdown() { 
     thread.interrupt(); // request a shutdown 
     thread.join();  // make sure we wait until Consumer.thread exits before this thread continues 
    } 

    public void run() { 
     try { 
      while(!thread.isInterrupted()) { 
       SomeMessage message = take(); 
       doSomethingWithMessage(message); 
      } 
     } catch(InterruptedException ex) { 
      System.out.println("Stop processing - consumer per request."); 
     } finally { 
      thread = null; 
     } 
    } 

    private SomeMessage take() throws InterruptedException { 
     synchronized(queue) { 
      queue.wait(); 
      return queue.remove(); 
     } 
    } 
} 

一對夫婦的事情,不同的在這個實現中。生產者和消費者共享一個Queue實例,並使用該實例在其上執行同步調用。這種方式既沒有擁有鎖,也沒有從該結構寫入或讀取。在他們添加到隊列(生產者)或從隊列(消費者)中刪除後,他們不需要使用同步。他們可以自由處理而無需彼此溝通。他們通過添加到尾部並從頭部繪製來在每個實例之間交換SomeMessage的實例。

take()方法在這段代碼中非常重要。如果沒有幫助器方法,則無法處理消息並釋放鎖。這一點非常重要,以便您的消費者可以收到消息並放開鎖定,以允許其他生產者/消費者在此特定消費者正在處理消息時添加/刪除消息。這可以儘可能快地保持吞吐量。

是的,我說生產者。這種架構允許多個生產者和多個消費者,而不需要改變生產者或消費者的內部。

請注意,捕獲InterruptedException在while循環之外。如果您想要一個可以清楚關閉的可預測程序,這一點非常重要。 InterruptedException和被中斷的概念是表現良好的Java線程的核心。如果你不知道在什麼情況下產生這個異常,你永遠不會理解Java中的多線程應用程序。這不是一個隨機事件。 Java線程不能以編程方式停止。另一個線程必須要求它自行中斷。線程必須服從請求,否則它不會停止。所以,如果我們得到一個。關掉。在這個程序中,我們只會在我們呼叫等待或通知的時候得到它,意味着當我們處理消息時,我們不會被打斷。消費者將在停止之前完成處理消息。

最後,考慮到Java中的併發庫,實現生產者 - 使用者關係實際上要容易得多,但這是一個很好的例子,說明如何在最低級別的Java中執行它以瞭解這些庫爲您做了什麼。

+0

正如我所說,我非常天真。 :) 據我所知,生產者消費者問題更多地是關於2個線程如何訪問數據結構。在這種情況下,我使用了一個縮放數據結構 - 一個變量。生產者將其遞增至5的極限,並且消費者將其遞減至0的水平。 – Bhaskar

+1

err no。生產者 - 消費者關係總是使用隊列來交易數據。關於p-c關係的重要性在於,它與許多生產者和/或消費者相比可以將處理負載分配給許多消費者(或生產者,如果需要的話)。只是在兩個線程之間交換通用數據並不具備該屬性。兩個線程可以使用很多方法交換數據:互斥量,信號量,共享內存,消息傳遞,但這些不是p-c關係。 – chubbsondubs

0

封裝消費和生產行爲可能更加可重用。在下面的代碼中,我將消費者/生產者線程的共享資源同步問題解耦,這對於解決類似的問題(如對象池和連接池)很有用。

import java.util.LinkedList; 
import java.util.Queue; 

public class ProducerConsumer { 

    public static void main(String[] args) { 
     SyncQueue syncQueue = new SyncQueue(1); 
     Producer producer = new Producer(syncQueue , 10); 
     Consumer consumer = new Consumer(syncQueue,10); 

     producer.start(); 
     consumer.start(); 
    } 

} 

class SyncQueue { 
    private Queue<Integer> queue = new LinkedList<Integer>(); 
    private Integer size; 
    public SyncQueue(Integer size) { 
     super(); 
     this.size = size; 
     this.signalledBefore = false; 
    } 

    public synchronized void put(Integer data){ 
     while(queue.size() == size){ 
      try { 
       wait(); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
     queue.add(data); 
     notifyAll(); 
    } 
    public synchronized Integer get(){ 
     while(queue.isEmpty()){ 
      try { 
       wait(); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
     Integer data = queue.remove(); 
     notifyAll(); 
     return data; 
    } 
} 

class Producer extends Thread{ 

    private SyncQueue syncQueue; 
    private Integer size; 
    public Producer(SyncQueue syncQueue, Integer size) { 
     this.syncQueue = syncQueue; 
     this.size = size; 
    } 
    @Override 
    public void run() { 
     for (Integer i = 0; i < size; i++) { 
      syncQueue.put(i); 
      System.out.println("Produced:" + i); 
      try { 
       sleep((int)Math.random()*100); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 

     } 
    } 

} 

class Consumer extends Thread{ 

    private SyncQueue syncQueue; 
    private Integer size; 

    public Consumer(SyncQueue syncQueue, Integer size) { 
     this.syncQueue = syncQueue; 
     this.size = size; 
    } 

    @Override 
    public void run() { 
     for (Integer i = 0; i < size; i++) { 
      try { 
       sleep((int)Math.random()*100); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
      System.out.println("Consumed:" + syncQueue.get()); 
     } 
    } 
} 
+0

歡迎使用堆棧溢出。你的代碼片段看起來很有希望,但請說,你的問題到底是什麼? – iluxa

+0

謝謝,我想建議Bhaskar使用隊列數據結構作爲消費者/生產者問題中的共享資源,並將同步封裝在該隊列中。 –