2012-06-07 47 views
5

我有一個多線程應用程序,它有一個生產者線程和多個消費者線程。 數據存儲在共享線程安全集合中,並在緩衝區中有足夠數據時刷新到數據庫。多線程Java應用程序中的數據緩衝

從javadocs中 -

BlockingQueue<E> 

隊列支持兩個附加等待存儲元素時該隊列,以獲取元素時變爲非空,並等待空間在隊列中變得可用的操作。

take() 

獲取並移除此隊列的頭部,如有必要則等待直到一個可用元素。

我的問題 -

  1. 是否有具有E []取(INT n)的方法的另一個集合?即阻塞隊列等待一個元素可用。我想要的是 ,它應該等到100或200個元素可用。
  2. 另外,有沒有另一種方法可以用來解決這個問題而無需輪詢?
+0

應該將元素平均分配給每個消費者,還是第一個消費者對take方法獲得第一個'n'元素,第二個消費者獲得下一個'n'元素等? – SimonC

+0

這真的是你想要做的嗎?如果生產速度超出最終調整的速度,則可能會在數據生成和刷新到數據庫之間產生幾乎任意大的延遲。如果你真的需要做到這一點緩衝的所有邏輯也許應該更喜歡「等我有N個元素或者X毫秒過去了」 – DRMacIver

+0

你爲什麼要等待?爲什麼不使用'drain()'?我會將所有可用的數據寫入最大值,我寧願不丟失數據。 –

回答

2

我認爲唯一的辦法就是要麼擴展一些BlockingQueue的實現或者使用take創建某種實用方法:

public <E> void take(BlockingQueue<E> queue, List<E> to, int max) 
     throws InterruptedException { 

    for (int i = 0; i < max; i++) 
     to.add(queue.take()); 
} 
+0

實際上,假設只有一個消費者,你的方法比我的要好得多。 – Zarkonnen

+1

這種方法根本不能很好地處理InterruptedException,因爲如果中斷,則會丟失任何元素。如果它不打算處理中斷本身,或者爲了捕獲並返回到目前爲止已耗盡的元素,它確實需要將元素添加到傳入的集合中。 – DRMacIver

+0

哦,評論點+1。更新! – dacwe

1

我不知道是否有在具有take(int n)型方法標準庫相似的類,但你應該能夠包住默認BlockingQueue添加該功能沒有太多的麻煩,你不認爲?

另一種情況是觸發一個操作,在該操作中,您將元素置於集合中,其中由您設置的閾值將觸發沖洗。

2

drainTo方法不是你正在尋找的東西,而是它會滿足你的目的嗎?

http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html#drainTo(java.util.Collection,INT)

編輯

你可以實現一個稍微更高性能的批量阻擋takemin使用takedrainTo組合:

public <E> void drainTo(final BlockingQueue<E> queue, final List<E> list, final int min) throws InterruptedException 
{ 
    int drained = 0; 
    do 
    { 
    if (queue.size() > 0) 
     drained += queue.drainTo(list, min - drained); 
    else 
    { 
     list.add(queue.take()); 
     drained++; 
    } 
    } 
    while (drained < min); 
} 
+1

這是他/她想要做的相當有什麼固定利率。 – posdef

+0

我已經更新了答案,表明它不能解決確切的問題。有時,OP不知道其他解決方案,所以總是值得提問。 – SimonC

+0

廣東話反駁這:) – posdef

1

所以這應該是一個線程安全的隊列,可以阻止任意數量的元素。驗證線程代碼是否正確的更多目光是值得歡迎的。

package mybq; 

import java.util.ArrayList; 
import java.util.LinkedList; 
import java.util.List; 

public class ChunkyBlockingQueue<T> { 
    protected final LinkedList<T> q = new LinkedList<T>(); 
    protected final Object lock = new Object(); 

    public void add(T t) { 
     synchronized (lock) { 
      q.add(t); 
      lock.notifyAll(); 
     } 
    } 

    public List<T> take(int numElements) { 
     synchronized (lock) { 
      while (q.size() < numElements) { 
       try { 
        lock.wait(); 
       } catch (InterruptedException e) { 
        Thread.currentThread().interrupt(); 
       } 
      } 
      ArrayList<T> l = new ArrayList<T>(numElements); 
      l.addAll(q.subList(0, numElements)); 
      q.subList(0, numElements).clear(); 
      return l; 
     } 
    } 
} 
+1

'add'中的'notifyAll'在這裏有點浪費。它應該是一個單獨的'notify',然後'take'可以在完成後還有更多的元素時再次調用'notify'。 – SimonC