2012-10-08 70 views
5

Java是否支持任何隊列對象或機制來處理批處理?例如:我們有一個隊列(或任何想要的隊列對象),一些生產者將物品逐個推入隊列,我的目標是當我們有10個物品或超過10個物品在這個隊列中時,我們可以觸發一些處理程序在一個批次中處理它。我們有一個java隊列對象或機制來處理批處理嗎?

或者它不會自動觸發,我們需要找到一種方法在處理程序端優雅地循環隊列。

我們是否有典型的高性能對象或庫來處理這個問題?

感謝, 埃姆雷

回答

0

看一看接口java.util.Queue的API文檔,其中有幾個實現。

還有一個標準的API,Java Message Service (JMS)來處理排隊系統在不同進程之間交換消息。

+1

這是如何解決配料問題的?我不認爲它確實如此。 –

0

我認爲CountDownLatch是你需要的,或者可能是CyclicBarrier。這將允許您設置一個同步點,在發生一定數量的操作後觸發消費者,並且可以使用標準隊列作爲容器對象。

+0

可否請你提供一些代碼片段,會很好理解思路。 –

+1

當你有幾個線程正在生產或多個線程正在消耗(或兩者兼而有之)時,'CountDownLatch'特別有用。目前還不清楚是否屬於這種情況。 –

2

Queue中的批處理可以通過wait/notify來實現,就像你會阻止對資源的線程調用一樣,直到它可用或不可用。

public class MyQueue implements Queue<Object>{ 
     public synchronized List<Object> peek() { 
     if(this.list.size()>=10) 
        this.list.wait(); 
     return Collections.subList(0,10); 
    } 
     @Override 
    public boolean add(Object e) { 
     this.list.add(e); 
       if(this.list.size()>=10) 
        this.list.notifyAll(); 
     return false; 
    } 
} 

它不會自動

在這種情況下,觸發可以調用等待與指定的時間進行。

2

您可以使用BlockingQueue.drainTo()來自動獲取要執行的任務的批處理。這適用於每秒超過100K任務。

如果您需要更高性能的排隊,則可以使用更復雜的DisruptorJava Chronicle,它們可以每秒排隊進行數百萬個任務,兩者都支持自動批處理。

+0

所以你的意思是,我們需要在方法方面實現它,對吧?我們添加一個循環來運行BlockingQueue.drainTo()以獲取隊列項目的列表,然後調用處理器來處理它。 –

+0

你可以做到這一點。生產者和消費者都有方法;) –

1

下面是在處理分批對象,使用後臺線程來收集和處理的對象快速嘗試通過其他線程推到一個隊列:

public abstract class Batcher<E> implements Runnable { 

    public static interface BatchProcessor<E> { 
     public void processBatch(List<E> batch); 
    } 

    private final BlockingQueue<E> queue; 
    private final BatchProcessor<E> processor; 

    private Batcher(BlockingQueue<E> queue, BatchProcessor<E> processor) { 
     this.queue = queue; 
     this.processor = processor; 
    } 

    @Override 
    public void run() { 
     try { 
      while (true) { 
       List<E> batch = new ArrayList<E>(); 
       for (int i = 0; i < 10; i++) { 
        batch.add(queue.take()); 
       } 
       processor.processBatch(batch); 
      } 
     } catch (InterruptedException e) { 
      return; 
     } 
    } 

} 

要使用此,您可以創建一個BlockingQueue並把對象它創建一個實現BatchProcessor的實例來處理批處理,然後創建一個Batcher的實例以將對象從前者抽取到後者。