2012-07-26 43 views
2

我試圖運行一個使用Java中的RabbitMQ的非常基本的應用程序。我很簡單喜歡使用Java ExecutorService併發地使用消息。我的項目是使用Spring,所以我定義我的ThreadPoolExecutorFactoryBean像這樣:與ExecutorService同時使用消息

<bean class="org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean" 
     destroy-method="destroy"> 
    <property name="corePoolSize" value="8"/> 
    <property name="keepAliveSeconds" value="600"/> 
    <property name="maxPoolSize" value="16"/> 
    <property name="threadGroupName" value="CallbackQueue-Group"/> 
    <property name="threadNamePrefix" value="CallbackQueue-Worker-"/> 
</bean> 

我注入這個bean到我的主要mesasge隊列利用類,這是做這樣的事情:

this.connection = getConnectionFactory().newConnection(getQueueExecutor()); 
this.channel = this.connection.createChannel(); 
this.channel.queueDeclare(getQueueName(), true, false, false, null); 
this.channel.basicConsume(getQueueName(), false, new DefaultConsumer(this.channel) { 
    @Override public void handleDelivery(String consumerTag, Envelope envelope, 
      BasicProperties properties, byte[] body) throws IOException { 
     logger.debug("Received message {}", properties.getCorrelationId()); 

     try { Thread.sleep(3000); } catch (InterruptedException e) {}; 

     getChannel().basicAck(envelope.getDeliveryTag(), false); 
    } 
}); 

簡而言之,當我將多條消息發佈到隊列中時,即使任務需要一段時間才能執行,我應該可以看到日誌語句相當接近。但是,我看到我的消費者只在時間處理一項任務,儘管ExecutorService!什麼是更加古怪的是,我居然看到不同的線程池中服務的隊列,儘管從來沒有在同一時間:

12:43:40.650 [CallbackQueue-Worker-2] DEBUG MyApplication - Received message 65bfbba29b4965eb0674c082c73dad7c 
12:43:43.737 [CallbackQueue-Worker-3] DEBUG MyApplication - Received message 2a0b29012b13857c5a0ae8060f66dbaa 
12:43:46.755 [CallbackQueue-Worker-3] DEBUG MyApplication - Received message 3c0742f9a284ac9c6b602200254c70db 
12:43:49.769 [CallbackQueue-Worker-3] DEBUG MyApplication - Received message a462236fab19d51ba4bfea1582410a64 
12:43:52.783 [CallbackQueue-Worker-3] DEBUG MyApplication - Received message 1a4713e1066dfc9e4ec1302098450a1f 

什麼我錯在這裏做什麼?是否有我在ThreadPoolExecutorFactoryBean或我的RabbitMQ代碼中錯過的其他配置?

+1

我知道你在2年前問過這個問題。希望你可能記得。我必須實現類似的場景,我需要一個工作線程池(ExecutorService)來使用來自Rabbit隊列的消息。這方面的成功嗎? – depthofreality 2014-06-08 12:01:54

+0

您能找到解決方案嗎?我目前仍然遇到同樣的問題。 – 2015-10-16 07:16:16

回答

1

從com.rabbitmq.client.Channel的描述:

雖然通道可以由多個線程使用,重要的是要確保 只有一個線程同時執行命令。執行命令併發執行 可能會導致拋出UnexpectedFrameError爲 。

這是一個原因嗎?您的日誌顯示使用了不同的工作人員(我們看到2和3),但每次只有一名工人。

+0

不,它不應該是一個問題。消費的全部重點是它可以一次完成多個線程,以便能夠同時處理作業。爲什麼我在創建'Connection'時能夠在'newConnection'工廠方法中將'ExecutorService'傳遞給它? – 2012-07-26 20:26:12

+0

但是,您可能需要另外打開兩個或更多頻道? – Nulldevice 2012-07-26 20:45:06

+0

每個通道可以一次提供多個連接,不是嗎? – 2012-07-26 21:37:05

相關問題