我想並行處理來自rabbitMq隊列的消息。隊列被配置爲autoAck = false。我正在使用支持camel endpoints的camel-rabbitMQ支持,它支持threadPoolSize參數,但這不會產生預期的效果。即使threadpoolsize = 20,消息仍然在隊列中依次處理。Rabbit Mq java客戶端並行消耗
通過代碼調試,我可以看到threadpoolsize參數用於創建一個ExecutorService,用於將here中描述的傳遞給rabbit連接因子。這一切看起來都不錯,直到你進入兔子ConsumerWorkService
。這裏的消息在最大16個消息的塊中處理。塊中的每條消息都是連續處理的,然後如果有更多工作要做,執行程序服務將與下一個塊一起調用。下面是一個代碼片段。從執行者服務的這種用法我看不到如何可以並行處理消息。執行者服務一次只有一件工作要執行。
我失蹤了什麼?
private final class WorkPoolRunnable implements Runnable {
public void run() {
int size = MAX_RUNNABLE_BLOCK_SIZE;
List<Runnable> block = new ArrayList<Runnable>(size);
try {
Channel key = ConsumerWorkService.this.workPool.nextWorkBlock(block, size);
if (key == null) return; // nothing ready to run
try {
for (Runnable runnable : block) {
runnable.run();
}
} finally {
if (ConsumerWorkService.this.workPool.finishWorkBlock(key)) {
ConsumerWorkService.this.executor.execute(new WorkPoolRunnable());
}
}
} catch (RuntimeException e) {
Thread.currentThread().interrupt();
}
}
您能配置ConsumerWorkService使用不同的塊大小嗎? –
嗨,克勞斯,我已經通過Github對Camel-rabbitmq組件進行了一些更改,如Fergus Nelson。我已經改變了RabbitMq消費者爲每個需要消費者的消費者建立一個渠道。我會在測試所有內容時創建Jira + pull請求。 –
@ mR_fr0g,據我所知,你已經通過在Camel-RabbitMQ組件中創建多個通道來解決了這個問題。您能否提供鏈接到您的Jira票證,拉取請求並指定該修復程序存在於哪個Camel版本中? – wheleph