2012-12-12 149 views
0

我正在研究Java中生產者消費者問題的變體。我有一個生成器線程創建對象,將其放入優先級阻塞隊列中,然後傳遞到主容器,控制器,這是一個有界緩衝區。生產者消費者變種java BlockingQueues

該隊列的原因是,當主容器有一定百分比的對象A時,它只接受類型B的對象,以及我們被要求查看的其他一些場景。 我無法弄清楚代碼出了什麼問題,調試器只是從InQueue中的in.offer跳轉到Producer中的in.push。任何方向或建議,將不勝感激。因爲走錯了路你使用泛型

import java.util.concurrent.PriorityBlockingQueue; 

     public class InQueue implements Runnable { 

     Controller c; 
     private PriorityBlockingQueue in; 

     public InQueue(Controller c) { 
      this.c = c; 
      in = new PriorityBlockingQueue(); 
     } 

     public void push(C c) { 

      in.offer(c); 
      try { 
       Thread.sleep(1000); 
      } catch (InterruptedException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
     } 

     public void run() { 
      while (true) { 
       try { 
        C temp = (C) in.take(); //will block if empty 
        c.arrive(temp); 
       } catch (InterruptedException e) {} // TODO 
      } 
     } 
    } 

public class Controller { 

    private BoundedBuffer buffer; 
    private int used; 


    Controller(int capacity) { 
     this.buffer = new BoundedBuffer(capacity); 
     used = 0; 
    } 


    public void arrive(C c) { 
     try { 
      buffer.put(c); 
      used++; 
     } catch (InterruptedException e) { } //TODO 
    } 

    public C depart() { 
     C temp = null; //BAD IDEA? 
     try { 
      temp = (C)buffer.take(); 
      used--; 
     } catch (InterruptedException e) { } //TODO 
     return temp; //could be null 
    } 
} 

回答

0

你的代碼不編譯。另一件事是沒有BoundedBuffer的默認實現。下面我通過阻塞隊列爲生產者 - 消費者問題做了一個工作實現。看看並糾正你的錯誤。

package concurrency; 

import java.util.concurrent.BlockingQueue; 
import java.util.concurrent.LinkedBlockingQueue; 

public class Producer<T> { 
    private final BlockingQueue<T> queue; 
    private final Consumer consumer; 
    private static volatile boolean isShutdown; 
    private final static Object lock = new Object(); 

    public Producer() { 
     this.queue = new LinkedBlockingQueue<T>(); 
     this.consumer = new Consumer(); 
    } 

    public void start() { 
     consumer.start(); 
    } 

    public void stop() { 
     synchronized (lock) { 
      isShutdown = true; 
     } 
     consumer.interrupt(); 
    } 

    public void put(T obj) throws InterruptedException { 
     synchronized (lock) { 
      if (isShutdown) 
       throw new IllegalStateException("Consumer Thread is not active"); 
     } 
     queue.put(obj); 
    } 

    private class Consumer extends Thread { 

     public void run() { 
      while (true) { 
       synchronized (lock) { 
        if (isShutdown) 
         break; 
       } 

       T t = takeItem(); 
       // do something with 't' 
       if(t!=null) 
       printItem(t); 
      } 
     } 

     private void printItem(T t) { 
      System.out.println(t); 
     } 

     private T takeItem() { 
      try { 
       return queue.take(); 
      } catch (InterruptedException e) { 
       Thread.currentThread().interrupt(); 
      } 
      return null; 
     } 
    } 

    public static void main(String[] args) throws InterruptedException { 

     Producer<Integer> producer = new Producer<Integer>(); 
     producer.start(); 
     for (int i = 0; i <20; i++) { 
      producer.put(i); 
      if (i >= 7) 
       Thread.sleep(500); 
     } 
     producer.stop(); 
    } 
} 
+0

沒有伴侶,我可以那樣做,你列出了什麼。泛型需要整理我會承認,但我不認爲這是問題所在。另外,我自己編寫了有界的緩衝區。 設置是生產者 - > BlockingQueue - > BoundedByffer - > OutBlockingQueue - >消費者。 – Saf