2012-02-27 102 views
14

我對與Java BlockingQueue相同的數據結構感興趣,除了它必須能夠對隊列中的對象進行批處理。換句話說,我希望生產者能夠將對象放入隊列中,但讓消費者阻止take(),直到隊列達到一定的大小(批量大小)。Java BlockingQueue與批處理?

然後,一旦隊列達到批量大小,生產者必須阻止put()直到消費者已經消耗了隊列中的所有元素(在這種情況下,生產者將再次開始生產並且消費者阻止直到批次再次達到)。

是否存在類似的數據結構?或者我應該寫它(我不介意),我只是不想浪費我的時間,如果有東西在那裏。


UPDATE

也許澄清事情有點:

的情況永遠是如下。可以有多個生產者將項目添加到隊列中,但永遠不會有多個消費者從隊列中抽取項目。

現在,問題是這些設置有多個並行和串行設置。換句話說,生產者爲多個隊列生產物品,而消費者本身也可以是生產者。這可以更容易地被認爲是生產者,消費者生產者和最終消費者的有向圖。

生產者應該阻塞,直到隊列爲空(@Peter Lawrey)的原因是因爲它們中的每一個都將在一個線程中運行。如果讓它們在空間變得可用時簡單地生成,那麼最終會出現一種情況,即您有太多線程試圖一次處理太多事情。

也許這與執行服務耦合可以解決問題?

回答

9

我建議你使用BlockingQueue.drainTo(Collection, int)。您可以將它與take()一起使用,以確保獲得最少數量的元素。

使用此方法的優點是,您的批量大小隨工作負載而動態增長,並且生產者在消費者繁忙時無需阻止。即它自己優化延遲和吞吐量。


要實現完全一樣問(我認爲這是一個壞主意),可以使用的SynchronousQueue一個繁忙的消費線程。

即消耗線程做當過消費者正忙的

list.clear(); 
while(list.size() < required) list.add(queue.take()); 
// process list. 

生產者將阻止。

+0

我希望生產者在消費者忙時阻止。 – 2012-02-27 14:49:03

+1

有趣的是,大多數系統竭盡全力避免這種情況。 ;)第二個建議將做到這一點。如果你想阻止生產者,爲什麼你使用多線程?對於「製片人」來說,處理器/消費者以及你似乎並不希望他們同時運行是不是更簡單。 – 2012-02-27 14:50:55

+0

請參閱我的更新。該設計要求生產者也是如此,以便執行線程的數量保持較低。此外,它解決了生產者和消費者之間的依賴性問題。 – 2012-02-27 16:17:06

1

不是我所知道的。如果我理解正確,您希望生產者在消費者被阻止之前工作(直到它填充隊列或消費者工作(而生產者阻止時),直到它清除了隊列。如果是這種情況,我可能會建議你不需要數據結構,但是需要一種機制來阻止一方,而另一方則在互斥時間中工作。你可以爲此鎖定一個對象,並且在內部具有是完全還是空的邏輯來釋放鎖並將其傳遞給對方。所以總之,你應該自己寫:)

1

這聽起來像RingBuffer如何在LMAX Disruptor模式下工作。有關更多信息,請參閱http://code.google.com/p/disruptor/

一個非常粗略的解釋是你的主要數據結構是RingBuffer。生產者依次將數據放入環形緩衝區,消費者可以將生產者放入緩衝區的數據(如此基本上爲批處理)拉出。如果緩衝區已滿,生產者將阻塞,直到消費者完成並釋放緩衝區中的插槽。

2

這是一個快速(=簡單但未完全測試)的實現,我認爲可能適合您的請求 - 如果需要,您應該可以將其擴展爲支持完整的隊列接口。

,以提高性能,你可以切換到的ReentrantLock而不是使用「同步」的關鍵詞..

public class BatchBlockingQueue<T> { 

    private ArrayList<T> queue; 
    private Semaphore readerLock; 
    private Semaphore writerLock; 
    private int batchSize; 

    public BatchBlockingQueue(int batchSize) { 
     this.queue = new ArrayList<>(batchSize); 
     this.readerLock = new Semaphore(0); 
     this.writerLock = new Semaphore(batchSize); 
     this.batchSize = batchSize; 
    } 

    public synchronized void put(T e) throws InterruptedException { 
     writerLock.acquire(); 
     queue.add(e); 
     if (queue.size() == batchSize) { 
      readerLock.release(batchSize); 
     } 
    } 

    public synchronized T poll() throws InterruptedException { 
     readerLock.acquire(); 
     T ret = queue.remove(0); 
     if (queue.isEmpty()) { 
      writerLock.release(batchSize); 
     } 
     return ret; 
    } 

} 

希望你覺得它有用。