2016-12-11 108 views
0

給定一個BlockingQueue<E>實例的集合,在整個集合上實現阻塞take()poll()的最有效方式是什麼?下面是代碼:阻塞隊列的「聯合」的實現

class UnionBlockingQueue<E> implements BlockingQueue<E> { 
    private final Collection<BlockingQueue<E>> sources; 

    public UnionBlockingQueue(Collection<BlockingQueue<E>> sources) { 
    this.sources = sources; 
    } 


    @Override 
    public E take() throws InterruptedException { 
    // takes first available E from the sources 
    } 

    @Override 
    public E poll(long timeout, TimeUnit unit) throws InterruptedException { 
    // polls first available E from the sources 
    } 

    // The rest is unsupported/irrelevant/out of scope 
    @Override 
    public boolean add(E e) { 
    throw new UnsupportedOperationException(); 
    } 

    @Override 
    public boolean offer(E e) { 
    throw new UnsupportedOperationException(); 
    } 
} 

我正打算包裝在一個單獨的類中的每個源隊列這將重寫add /報價方法和觸發這個類的notFullnotEmpty條件,就像在Condition樣品使用,但是,對於Java隊列是新手,我認爲可能會有更好/更安全/更高效的方法或庫。

+0

如果你有能力包裝源隊列並重寫'add'和'offer',那麼你不能簡單地向所有生產者發出一個隊列嗎? – teppic

+0

@teppic因爲它不一樣,每個人都可以重寫,但並不是每個人都有能力改變應用程序的設計,所有我在這裏給出的是一組隊列。 – Osw

+0

也許我誤解了你的意思是「覆蓋」。您是否控制生產者用戶的隊列實例的類型或實例化? – teppic

回答

2

最簡單的方式將管子排隊SynchronousQueue

public static void main(String[] args) throws InterruptedException { 

    // Input queues 
    LinkedBlockingDeque<String> q1 = new LinkedBlockingDeque<>(); 
    LinkedBlockingDeque<String> q2 = new LinkedBlockingDeque<>(); 
    LinkedBlockingDeque<String> q3 = new LinkedBlockingDeque<>(); 
    List<LinkedBlockingDeque<String>> qs = Arrays.asList(q1, q2, q3); 

    // Output queue 
    SynchronousQueue<String> combined = new SynchronousQueue<>(true); 

    // Pipe logic 
    Executor executor = Executors.newCachedThreadPool(r -> { 
     Thread t = new Thread(r, "q pipe"); 
     t.setDaemon(true); 
     return t; 
    }); 

    for (LinkedBlockingDeque<String> q : qs) { 
     executor.execute(() -> { 
      try { 
       while (!Thread.currentThread().isInterrupted()) { 
        combined.put(q.take()); 
       } 
      } catch (InterruptedException e) { 
       // done 
      } 
     }); 
    } 

    // Test 
    q1.put("foo"); 
    q2.put("bar"); 
    q3.put("baz"); 

    String e; 
    while ((e = combined.poll(100, TimeUnit.MILLISECONDS)) != null) { 
     System.out.println(e); 
    } 
} 
+0

感謝SynchronousQueue的想法,這對我來說是新的。然而,使用執行程序是我試圖避免的原因,在我的情況下,它會輕易地耗盡所有可用資源,或者在共享執行程序的情況下顯示性能不佳。對不起,我應該提到幾十個聯合實例,每個聯合實例都有幾十個源隊列。 – Osw