2016-08-04 248 views
1

我正在嘗試使用信號量的生產者 - 消費者問題。除了一個地方,該程序對我來說看起來很好。如何在使用Semphores的生產者 - 消費者中消費?

public class ProducerConsumerWithSemaphores 
{ 
    private final ArrayList<Integer> list = new ArrayList<>(5); 
    private final Semaphore semaphoreProducer = new Semaphore(1); 
    private final Semaphore semaphoreConsumer = new Semaphore(0); 

    private void produce() throws InterruptedException 
    { 
     for(int i = 0;i< 5;i++) 
     { 
      semaphoreProducer.acquire(); 
      list.add(i); 
      System.out.println("Produced: " + i); 
      semaphoreConsumer.release(); 
     } 
    } 

    private void consumer() throws InterruptedException 
    { 
     while (!list.isEmpty()) /// This line is where I have the doubt 
     { 
      semaphoreConsumer.acquire(); 
      System.out.println("Consumer: " + list.remove(list.size()-1)); 
      semaphoreProducer.release(); 
      Thread.sleep(100); 
     } 
    } 

    public static void main(String[] args) 
    { 
     final ProducerConsumerWithSemaphores obj = new ProducerConsumerWithSemaphores(); 

     new Thread(new Runnable() 
     { 
      @Override 
      public void run() 
      { 
       try 
       { 
        obj.produce(); 
       } catch (InterruptedException e) 
       { 
        e.printStackTrace(); 
       } 
      } 
     }).start(); 

     new Thread(new Runnable() 
     { 
      @Override 
      public void run() 
      { 
       try 
       { 
        obj.consumer(); 
       } catch (InterruptedException e) 
       { 
        e.printStackTrace(); 
       } 
      } 
     }).start(); 
    } 
} 

在獲取信號量之前檢查列表是否爲空是否可以?這會導致多線程環境中的任何問題嗎?

回答

1
private void consumer() throws InterruptedException 
{ 
    while (!list.isEmpty()) /// This line is where I have the doubt 

問題是,如果消費者跑得比生產者快,消費者立即退出,那麼你就沒有消費者!

正確的示例看起來像, Producer–consumer problem#Using semaphores。我相信你的意圖不是使用true作爲無限循環,因爲你希望生產者/消費者在作業完成後退出。如果這是您的意圖,您可以1.設置totalCount來結束循環。 2.或者 boolean標誌,當生產者放棄最後一個標誌時 putItemIntoBuffer將由生產者設置。該旗幟必須受到保護,以及 buffer(更新:如果有多個生產者/消費者這種方法不起作用)3.模擬EOF(從producer - consume; how does the consumer stop?採取的想法)

會不會做多線程環境中的任何問題?

您的關鍵部分(您的list)未受到保護。通常我們使用3個信號量。第三個用作互斥體來保護緩衝區。

要停止生產者/消費者,
示例代碼方法1:

public class Test3 { 

    private Semaphore mutex = new Semaphore(1); 
    private Semaphore fillCount = new Semaphore(0); 
    private Semaphore emptyCount = new Semaphore(3); 

    private final List<Integer> list = new ArrayList<>(); 

    class Producer implements Runnable { 

    private final int totalTasks; 

    Producer(int totalTasks) { 
     this.totalTasks = totalTasks; 
    } 

    @Override 
    public void run() { 
     try { 
     for (int i = 0; i < totalTasks; i++) { 
      emptyCount.acquire(); 
      mutex.acquire(); 
      list.add(i); 
      System.out.println("Produced: " + i); 
      mutex.release(); 
      fillCount.release(); 
     } 
     } catch (InterruptedException e) { 
     e.printStackTrace(); 
     } 
    } 
    } 

    class Consumer implements Runnable { 
    private final int totalTasks; 

    Consumer(int totalTasks) { 
     this.totalTasks = totalTasks; 
    } 

    @Override 
    public void run() { 
     try { 
     for (int i = 0; i < totalTasks; i++) { 
      fillCount.acquire(); 
      mutex.acquire(); 
      int item = list.remove(list.size() - 1); 
      System.out.println("Consumed: " + item); 
      mutex.release(); 
      emptyCount.release(); 
     } 
     } catch (InterruptedException e) { 
     e.printStackTrace(); 
     } 
    } 
    } 

    public void runTest() { 
    int numProducer = 3; 
    int tasksPerProducer = 10; 
    int numConsumer = 6; 
    int tasksPerConsumer = 5; 

    for (int i = 0; i < numProducer; i++) { 
     new Thread(new Producer(tasksPerProducer)).start(); 
    } 
    for (int i = 0; i < numConsumer; i++) { 
     new Thread(new Consumer(tasksPerConsumer)).start(); 
    } 
    } 

    public static void main(String[] args) throws IOException { 
    Test3 t = new Test3(); 
    t.runTest(); 
    } 
} 

示例代碼方法3:

public class Test4 { 

    private Semaphore mutex = new Semaphore(1); 
    private Semaphore fillCount = new Semaphore(0); 
    private Semaphore emptyCount = new Semaphore(3); 

    private Integer EOF = Integer.MAX_VALUE; 

    private final Queue<Integer> list = new LinkedList<>(); // need to put/get data in FIFO 

    class Producer implements Runnable { 

    private final int totalTasks; 

    Producer(int totalTasks) { 
     this.totalTasks = totalTasks; 
    } 

    @Override 
    public void run() { 
     try { 
     for (int i = 0; i < totalTasks + 1; i++) { 
      emptyCount.acquire(); 
      mutex.acquire(); 
      if (i == totalTasks) { 
      list.offer(EOF); 
      } else { 
      // add a valid value 
      list.offer(i); 
      System.out.println("Produced: " + i); 
      } 
      mutex.release(); 
      fillCount.release(); 
     } 
     } catch (InterruptedException e) { 
     e.printStackTrace(); 
     } 
    } 
    } 

    class Consumer implements Runnable { 

    @Override 
    public void run() { 
     try { 
     boolean finished = false; 
     while (!finished) { 
      fillCount.acquire(); 
      mutex.acquire(); 
      int item = list.poll(); 
      if (EOF.equals(item)) { 
      // do not consume this item because it means EOF 
      finished = true; 
      } else { 
      // it's a valid value, consume it. 
      System.out.println("Consumed: " + item); 
      } 
      mutex.release(); 
      emptyCount.release(); 
     } 
     } catch (InterruptedException e) { 
     e.printStackTrace(); 
     } 
    } 
    } 

    public void runTest() { 
    int numProducer = 3; 
    int tasksPerProducer = 10; 

    for (int i = 0; i < numProducer; i++) { 
     new Thread(new Producer(tasksPerProducer)).start(); 
    } 

    int numConsumer = numProducer; // producers will put N EOFs to kill N consumers. 
    for (int i = 0; i < numConsumer; i++) { 
     new Thread(new Consumer()).start(); 
    } 
    } 

    public static void main(String[] args) throws IOException { 
    Test4 t = new Test4(); 
    t.runTest(); 
    } 
} 
+0

謝謝@waltersu。那是我正在尋找的答案。你有什麼鏈接可以看到這個實現嗎? – Kode

0

而不是使用兩個信號燈你爲什麼不使用單個信號以便在線程之間進行同步link

另外你可以它是線程安全的ArrayBlockingQueue,以正確顯示生產者消費者問題。

+0

謝謝Sohil。我知道如何使用ArrayBlockingQueue來解決這個問題,但我正在以更細化的方式嘗試它 – Kode