2013-10-08 136 views
5

我想並行處理來自rabbitMq隊列的消息。隊列被配置爲autoAck = false。我正在使用支持camel endpoints的camel-rabbitMQ支持,它支持threadPoolSize參數,但這不會產生預期的效果。即使threadpoolsize = 20,消息仍然在隊列中依次處理。Rabbit Mq java客戶端並行消耗

通過代碼調試,我可以看到threadpoolsize參數用於創建一個ExecutorService,用於將here中描述的傳遞給rabbit連接因子。這一切看起來都不錯,直到你進入兔子ConsumerWorkService。這裏的消息在最大16個消息的塊中處理。塊中的每條消息都是連續處理的,然後如果有更多工作要做,執行程序服務將與下一個塊一起調用。下面是一個代碼片段。從執行者服務的這種用法我看不到如何可以並行處理消息。執行者服務一次只有一件工作要執行。

我失蹤了什麼?

private final class WorkPoolRunnable implements Runnable { 

     public void run() { 
      int size = MAX_RUNNABLE_BLOCK_SIZE; 
      List<Runnable> block = new ArrayList<Runnable>(size); 
      try { 
       Channel key = ConsumerWorkService.this.workPool.nextWorkBlock(block, size); 
       if (key == null) return; // nothing ready to run 
       try { 
        for (Runnable runnable : block) { 
         runnable.run(); 
        } 
       } finally { 
        if (ConsumerWorkService.this.workPool.finishWorkBlock(key)) { 
         ConsumerWorkService.this.executor.execute(new WorkPoolRunnable()); 
        } 
       } 
      } catch (RuntimeException e) { 
       Thread.currentThread().interrupt(); 
      } 
     } 
+0

您能配置ConsumerWorkService使用不同的塊大小嗎? –

+1

嗨,克勞斯,我已經通過Github對Camel-rabbitmq組件進行了一些更改,如Fergus Nelson。我已經改變了RabbitMq消費者爲每個需要消費者的消費者建立一個渠道。我會在測試所有內容時創建Jira + pull請求。 –

+0

@ mR_fr0g,據我所知,你已經通過在Camel-RabbitMQ組件中創建多個通道來解決了這個問題。您能否提供鏈接到您的Jira票證,拉取請求並指定該修復程序存在於哪個Camel版本中? – wheleph

回答

3

的RabbitMQ的文檔是不是這個很清楚,但是,即使ConsumerWorkService使用線程池,這個池似乎並不在某種程度上被用於並行處理消息:

每個通道都有自己的調度線程。對於每個頻道一個消費者最常見的使用情況,這意味着消費者不支持其他消費者。如果每個渠道有多個消費者,請注意長時間運行的消費者可能會阻止向該渠道上的其他消費者分派回叫。

http://www.rabbitmq.com/api-guide.html

該文檔建議使用一個Channel每個線程,事實上,如果你只是創造儘可能多Channel S作爲併發的要求的水平,消息將鏈接到消費者之間出動這些渠道。

我測試過2個通道和消費者:當2條消息在隊列中時,每個消費者一次只能選擇一條消息。你提到的16條消息塊似乎不會干涉,這是一件好事。事實上,Spring AMQP還創建了幾個同時處理消息的通道。

我還測試了這款被按預期工作。

3

如果您有一個單一的Channel實例,它會在您正確檢查ConsumerWorkService時正確調用註冊的消費者。有兩種方法可以解決這個問題:

  1. 使用多個通道而不是一個通道。
  2. 使用單一渠道,但以特殊方式實施消費者。他們應該從隊列中選擇傳入的消息,並將其作爲任務放入內部線程池中。

你可以在this post找到更多詳細信息。