我已經創建了一個具有20個內核的指定線程池的連接。只有一個線程在執行器服務中同時運行,RabbitMQ
ConnectionFactory factory = new ConnectionFactory();
....
//specified es
ExecutorService consumerExecutor = Executors.newFixedThreadPool(threadNum, threadFactory);
con = factory.newConnection(consumerExecutor, addresses);
然後從該連接建立頻道:
final Channel channel = connection.createChannel();
並使用它來創建DefaultConsumer。
雖然我發現儘管線程可以用來消費消息,但總是隻有一個線程正在消費消息,即使消息在服務器中大量累積。
我看看源代碼,並發現:
private final class WorkPoolRunnable implements Runnable {
@Override
public void run() {
int size = MAX_RUNNABLE_BLOCK_SIZE;
List<Runnable> block = new ArrayList<Runnable>(size);
try {
Channel key = ConsumerWorkService.this.workPool.nextWorkBlock(block, size);
if (key == null) return; // nothing ready to run
try {
for (Runnable runnable : block) {
runnable.run();
}
} finally {
if (ConsumerWorkService.this.workPool.finishWorkBlock(key)) {
ConsumerWorkService.this.executor.execute(new WorkPoolRunnable());
}
}
} catch (RuntimeException e) {
Thread.currentThread().interrupt();
}
}
}
/* Basic work selector and state transition step */
private K readyToInProgress() {
K key = this.ready.poll();
if (key != null) {
this.inProgress.add(key);
}
return key;
}
/**
* Return the next <i>ready</i> client,
* and transfer a collection of that client's items to process.
* Mark client <i>in progress</i>.
* If there is no <i>ready</i> client, return <code><b>null</b></code>.
* @param to collection object in which to transfer items
* @param size max number of items to transfer
* @return key of client to whom items belong, or <code><b>null</b></code> if there is none.
*/
public K nextWorkBlock(Collection<W> to, int size) {
synchronized (this) {
K nextKey = readyToInProgress();
if (nextKey != null) {
VariableLinkedBlockingQueue<W> queue = this.pool.get(nextKey);
drainTo(queue, to, size);
}
return nextKey;
}
}
訣竅應該是ConsumerWorkService.this.workPool.nextWorkBlock
,它輪詢從就緒隊列的通道,並運行回調後添加到終點塊讀隊列run()
。如果我錯了,請糾正我。
這是令人困惑的,因爲消費者綁定到一個通道,並且直到最後一個任務塊完成時纔將通道釋放到隊列中,這意味着線程池始終只爲該消費者提供一個線程。
問題:
- 爲什麼RabbitMQ的設計,此模型
- 我們如何優化這一問題
- 是好到任務提交給一個獨立的線程池
handleDelivery
消費消息以及ACK (確保消息ACK任務完成後,才)