2014-03-04 26 views
0

我需要實現一個消費者生產者示例。這是一個簡單的程序,我修改了一下,但我不確定是否存在潛在的問題。如果有人能幫我完善它,我將不勝感激。我現在主要的 問題是,當製片人完成時,我不知道如何阻止消費者。如何根據生產者線程狀態停止消費者線程

我曾嘗試下面的代碼,但停止()已被棄用,這也不起作用:

if (!producer.isAlive()) { 
      consumer.stop(); 
    } 

ProducerConsumer.java:

import java.util.Vector; 

    public class ProducerConsumer { 
     public static void main(String[] args) { 
      int size = 5; 
      Vector<Integer> sQ = new Vector<Integer>(size); 
      Thread consumer = new Thread(new Consumer(sQ, size)); 
      Thread producer = new Thread(new Producer(sQ, size)); 
      consumer.start(); 
      producer.start(); 

      if (!producer.isAlive()) { 
       consumer.stop(); 
      } 
     } 
    } 

    class Consumer implements Runnable { 
     Vector<Integer> sQ = new Vector<Integer>(); 
     int size; 

     public Consumer(Vector<Integer> sQ, int size) { 
      this.sQ = sQ; 
      this.size = size; 
     } 

     @Override 
     public void run() { 
      while (true) { 
       try { 
        System.out.println("Consuming element: " + consume());; 
        Thread.sleep(50); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
      } 

     } 

     private int consume() throws InterruptedException { 
      synchronized (sQ) { 

       while (sQ.isEmpty()) { 
        System.out.println("The queue is empty and " 
          + Thread.currentThread().getName() + " has to wait." 
          + "size is: " + sQ.size()); 
        sQ.wait(); 
       } 

       sQ.notifyAll(); 
       return sQ.remove(0); 
      } 

     } 

    } 

    class Producer implements Runnable { 
     Vector<Integer> sQ = new Vector<Integer>(); 
     int size; 

     public Producer(Vector<Integer> sQ, int size) { 
      this.sQ = sQ; 
      this.size = size; 
     } 

     @Override 
     public void run() { 
      for (int i = 0; i < 12; ++i) { 
       try { 
        produce(i); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
      } 
     } 

     private void produce(int i) throws InterruptedException { 

      synchronized (sQ) { 
       while (sQ.size() == size) { 
        System.out.println("The queue is full and " 
          + Thread.currentThread().getName() + " has to wait." 
          + "size is: " + sQ.size()); 
        sQ.wait(); 
       } 

       sQ.add(i); 
       sQ.notify(); 
      } 

     } 
    } 

回答

2

推薦的方法通常是在需要終止的線程上設置布爾標誌(finished或類似),然後循環while(!finished)。 (請注意,該標誌通常需要爲volatile,以便線程將看到更改。)如果線程預計會被阻塞,那麼您可以使用interrupt()重新啓動它的等待循環。

雖然您採取的整體方法似乎已過時。 BlockingQueue實現專爲簡化生產者 - 消費者實現而設計,許多這樣的問題可以通過使用Executor更加有效地處理,並在它們進入時向它發射任務,而不是手動排隊和輪詢。

+0

將它改爲while(!finished ||!sQ.isEmpty())。你知道我在哪裏可以找到使用Executor的很好的示例/模式嗎? – Flethuseo

0

使用CountdownLatch。這允許您等待它在一個線程中被降低,然後從另一個線程中降低它。它是線程安全的,專爲此用例而設計。

如果您打算使用布爾值,如其中一個提議中所建議的那樣,請使用AtomicBoolean。

通常,避免使用諸如synchronized或volatile之類的語言原語,而應使用java.concurrent包提供的更高級別的構造。如果你打算走低水平,你需要對語義有一個牢固的理解。

如果你想重用,而不是徹底改造,你可能會想用我的並行處理迭代:https://github.com/jillesvangurp/iterables-support/blob/master/src/main/java/com/jillesvangurp/iterables/ConcurrentProcessingIterable.java

您只需在的foreach輸入和它同時產生的輸出與儘可能多的線程,因爲你需要。