2015-01-27 74 views
0

我正在尋找一個允許我添加項目以處理項目並且當項目數量等於批量大小執行某些操作的類。我會用這樣的事情:通用配料類的正確方法

Batcher<Token> batcher = new Batcher<Token>(500, Executors.newFixedThreadPool(4)) { 
     public void onFlush(List<Token> tokens) { 
      rest.notifyBatch(tokens); 
     } 
    }; 

    tokens.forEach((t)->batcher.add(t)); 
    batcher.awaitDone(); 

#awaitDone後我知道所有的令牌已被通知。 #onFlush可能會做任何事情,例如,我可能想批量插入數據庫。我希望將#onFlush調用放入Executor。

我想出了一個解決方案,但它看起來像很多代碼,所以我的問題是,有沒有更好的方法,我應該這樣做?除了我實施的課程還有一個更好的實施方法嗎?看起來像我的解決方案有很多移動件。

這是我想出了代碼:

/** 
* Simple class to allow the batched processing of items and then to alternatively wait 
* for all batches to be completed. 
*/ 
public abstract class Batcher<T> { 

    private final int batchSize; 
    private final ArrayBlockingQueue<T> batch; 
    private final Executor executor; 
    private final Phaser phaser = new Phaser(1); 
    private final AtomicInteger processed = new AtomicInteger(0); 

    public Batcher(int batchSize, Executor executor) { 
     this.batchSize = batchSize; 
     this.executor = executor; 
     this.batch = new ArrayBlockingQueue<>(batchSize); 
    } 

    public void add(T item) { 
     processed.incrementAndGet(); 
     while (!batch.offer(item)) { 
      flush(); 
     } 
    } 

    public void addAll(Iterable<T> items) { 
     for (T item : items) { 
      add(item); 
     } 
    } 

    public int getProcessedCount() { 
     return processed.get(); 
    } 

    public void flush() { 
     if (batch.isEmpty()) 
      return; 

     final List<T> batched = new ArrayList<>(batchSize); 
     batch.drainTo(batched, batchSize); 
     if (!batched.isEmpty()) 
      executor.execute(new PhasedRunnable(batched)); 
    } 

    public abstract void onFlush(List<T> batch); 

    public void awaitDone() { 
     flush(); 
     phaser.arriveAndAwaitAdvance(); 
    } 

    public void awaitDone(long duration, TimeUnit unit) throws TimeoutException { 
     flush(); 
     try { 
      phaser.awaitAdvanceInterruptibly(phaser.arrive(), duration, unit); 
     } 
     catch (InterruptedException e) { 
      Thread.currentThread().interrupt(); 
     } 
    } 

    private class PhasedRunnable implements Runnable { 
     private final List<T> batch; 

     private PhasedRunnable(List<T> batch) { 
      this.batch = batch; 
      phaser.register(); 
     } 

     @Override 
     public void run() { 
      try { 
       onFlush(batch); 
      } 
      finally { 
       phaser.arrive(); 
      } 
     } 
    } 
} 

一個Java 8的解決方案將是巨大的。謝謝。

回答

1

引人矚目的是,您的代碼不適用於多個線程將項目添加到單個Batcher實例。如果我們將此限制轉換爲指定的用例,則不需要在內部使用專用的併發類。因此,我們可以累積到一個普通的ArrayList中,並在容量耗盡時將此列表換成新列表,而不需要複製項目。這允許簡化了代碼

public class Batcher<T> implements Consumer<T> { 

    private final int batchSize; 
    private final Executor executor; 
    private final Consumer<List<T>> actualAction; 
    private final Phaser phaser = new Phaser(1); 
    private ArrayList<T> batch; 
    private int processed; 

    public Batcher(int batchSize, Executor executor, Consumer<List<T>> c) { 
     this.batchSize = batchSize; 
     this.executor = executor; 
     this.actualAction = c; 
     this.batch = new ArrayList<>(batchSize); 
    } 

    public void accept(T item) { 
     processed++; 
     if(batch.size()==batchSize) flush(); 
     batch.add(item); 
    } 

    public int getProcessedCount() { 
     return processed; 
    } 

    public void flush() { 
     List<T> current = batch; 
     if (batch.isEmpty()) 
      return; 
     batch = new ArrayList<>(batchSize); 
     phaser.register(); 
     executor.execute(() -> { 
      try { 
       actualAction.accept(current); 
      } 
      finally { 
       phaser.arrive(); 
      } 
     }); 
    } 

    public void awaitDone() { 
     flush(); 
     phaser.arriveAndAwaitAdvance(); 
    } 

    public void awaitDone(long duration, TimeUnit unit) throws TimeoutException { 
     flush(); 
     try { 
      phaser.awaitAdvanceInterruptibly(phaser.arrive(), duration, unit); 
     } 
     catch (InterruptedException e) { 
      Thread.currentThread().interrupt(); 
     } 
    } 
} 

關於Java的8個具體的改進,它採用了Consumer允許無需繼承Batcher指定通過lambda表達式的最後行動。此外,PhasedRunnable被lambda表達式替換。作爲另一種簡化,Batcher<T> implements Consumer<T>其中每個Iterable支持forEach(Consumer<? super T>),這就避免了對方法addAll的需要。

所以使用情況現在看起來像:

Batcher<Token> batcher = new Batcher<>(
    500, Executors.newFixedThreadPool(4), currTokens -> rest.notifyBatch(currTokens)); 

tokens.forEach(batcher); 
batcher.awaitDone(); 
+0

感謝@Holger,爲什麼沒有我的代碼工作,如果多個線程正在呼籲#將? – 2015-01-27 19:50:15

+2

因爲調用'add'後跟'awaitDone'匹配「check-then-act」模式。如果另一個線程執行某個操作(「add」或「awaitDone」),它就會中斷。此外,你已經用'1'配置了'Phaser',這顯然意味着調用'awaitDone'的兩個線程將不起作用。 – Holger 2015-01-28 08:27:24

+0

所以兩個線程可以調用#add,但只有一個線程可以調用#awaitDone。這將允許多個生產者,但只有一個#awaitDone。這是我可以忍受的限制。 – 2015-11-22 20:35:12