我正在尋找一個允許我添加項目以處理項目並且當項目數量等於批量大小執行某些操作的類。我會用這樣的事情:通用配料類的正確方法
Batcher<Token> batcher = new Batcher<Token>(500, Executors.newFixedThreadPool(4)) {
public void onFlush(List<Token> tokens) {
rest.notifyBatch(tokens);
}
};
tokens.forEach((t)->batcher.add(t));
batcher.awaitDone();
#awaitDone後我知道所有的令牌已被通知。 #onFlush可能會做任何事情,例如,我可能想批量插入數據庫。我希望將#onFlush調用放入Executor。
我想出了一個解決方案,但它看起來像很多代碼,所以我的問題是,有沒有更好的方法,我應該這樣做?除了我實施的課程還有一個更好的實施方法嗎?看起來像我的解決方案有很多移動件。
這是我想出了代碼:
/**
* Simple class to allow the batched processing of items and then to alternatively wait
* for all batches to be completed.
*/
public abstract class Batcher<T> {
private final int batchSize;
private final ArrayBlockingQueue<T> batch;
private final Executor executor;
private final Phaser phaser = new Phaser(1);
private final AtomicInteger processed = new AtomicInteger(0);
public Batcher(int batchSize, Executor executor) {
this.batchSize = batchSize;
this.executor = executor;
this.batch = new ArrayBlockingQueue<>(batchSize);
}
public void add(T item) {
processed.incrementAndGet();
while (!batch.offer(item)) {
flush();
}
}
public void addAll(Iterable<T> items) {
for (T item : items) {
add(item);
}
}
public int getProcessedCount() {
return processed.get();
}
public void flush() {
if (batch.isEmpty())
return;
final List<T> batched = new ArrayList<>(batchSize);
batch.drainTo(batched, batchSize);
if (!batched.isEmpty())
executor.execute(new PhasedRunnable(batched));
}
public abstract void onFlush(List<T> batch);
public void awaitDone() {
flush();
phaser.arriveAndAwaitAdvance();
}
public void awaitDone(long duration, TimeUnit unit) throws TimeoutException {
flush();
try {
phaser.awaitAdvanceInterruptibly(phaser.arrive(), duration, unit);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private class PhasedRunnable implements Runnable {
private final List<T> batch;
private PhasedRunnable(List<T> batch) {
this.batch = batch;
phaser.register();
}
@Override
public void run() {
try {
onFlush(batch);
}
finally {
phaser.arrive();
}
}
}
}
一個Java 8的解決方案將是巨大的。謝謝。
感謝@Holger,爲什麼沒有我的代碼工作,如果多個線程正在呼籲#將? – 2015-01-27 19:50:15
因爲調用'add'後跟'awaitDone'匹配「check-then-act」模式。如果另一個線程執行某個操作(「add」或「awaitDone」),它就會中斷。此外,你已經用'1'配置了'Phaser',這顯然意味着調用'awaitDone'的兩個線程將不起作用。 – Holger 2015-01-28 08:27:24
所以兩個線程可以調用#add,但只有一個線程可以調用#awaitDone。這將允許多個生產者,但只有一個#awaitDone。這是我可以忍受的限制。 – 2015-11-22 20:35:12