立足我的建築離這個例子:通道在RabbitMQ的空轉與工作隊列情景
設置:
- 工人同時
- 得到一個消息,每個勞工下載量文檔,需要幾秒鐘
- 工人成功下載文檔後,它會確認郵件
- 如果工人不能下載一個文件,它爲noAcks重新排隊(三級重試的最大值)
我期待到我的實現的瓶頸是造成減速的消息。因爲我使用noAck來重新排隊失敗的工人。要啓用此均勻地分佈在我的工作線程我已經設置預取1.Looking在這個問題:RabbitMQ work queue is blocking consumers - 他們看到我所看到的在下面的截圖:
要使確保工人一次只能分配一條消息,我需要將預取設置爲1,但也有人說這會導致工人順序工作而不是並行工作。
跑步在渠道層面究竟意味着什麼?我看到隊列和連接正常運行,但單個通道(每個線程一個)空閒。
編輯#1:關於將連接池傳遞給RabbitMQ連接的說明看起來很有希望。 https://www.rabbitmq.com/api-guide.html#consumer-thread-pool我使用Spring AMQP,但我認爲類似的方法可以用在這裏:
/**
* Configure a large thread pool for concurrent channels on the physical Connection
*/
@Bean
public org.springframework.amqp.rabbit.connection.CachingConnectionFactory rabbitConnectionFactory() {
logger.info("Configuring connection factory");
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setAddresses(this.rabbitMQProperties.getAddresses());
cf.setUsername(this.rabbitMQProperties.getUsername());
cf.setPassword(this.rabbitMQProperties.getPassword());
cf.setVirtualHost(this.rabbitMQProperties.getVirtualHost());
//configure a large thread pool for the connection thread
int threads = 30;
logger.info(String.format("Configuring thread pool with %d threads", threads));
ExecutorService connectionPool = Executors.newFixedThreadPool(threads);
cf.setExecutor(connectionPool);
logger.info(String.format("MQ cache mode: %s", cf.getCacheMode().toString()));
logger.info(String.format("MQ connection cache: %d", cf.getConnectionCacheSize()));
logger.info(String.format("MQ channel cache: %d", cf.getChannelCacheSize()));
return cf;
}
@Bean
AmqpTemplate rabbitTemplate(org.springframework.amqp.rabbit.connection.CachingConnectionFactory connectionFactory){
AmqpTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}