2014-10-31 114 views
3

我想使用簡單的處理器/異步處理器作爲目的地並行處理隊列中的消息。處理器每條消息需要一點時間,但每條消息都可以單獨處理,因此同時處理(在健康邊界內)。與ActiveMQ並行處理多條消息

我很難找到例子,特別是關於駱駝路線的xml配置。

到目前爲止,我已經定義了一個線程池,路線和處理器:

<threadPool id="smallPool" threadName="MyProcessorThread" poolSize="5" maxPoolSize="50" maxQueueSize="100"/> 
<route> 
    <from uri="broker:queue:inbox" /> 
    <threads executorServiceRef="smallPool"> 
     <to uri="MyProcessor" /> 
    </threads> 
</route> 
<bean id="MyProcessor" class="com.example.java.MyProcessor" /> 

和我的處理器的樣子:

public class MyProcessor implements Processor { 
    @Override 
    public void process(Exchange exchange) throws Exception { 
     Message in = exchange.getIn(); 
     String msg = in.getBody(String.class);  
     System.out.println(msg); 
     try { 
      Thread.sleep(10 * 1000); // Do something in the background 
     } catch (InterruptedException e) {} 
     System.out.println("Done!"); 
    } 
} 

不幸的是,當我發帖的隊列,他們是仍然一一處理,每個延遲10秒(我的「後臺任務」)。

任何人都可以指向正確的方向使用定義的線程池處理消息或解釋我做錯了什麼?

+2

你試過concurrentConsumers?在你的例子中,這將是broker:queue:inbox?concurrentConsumers = 5(或其他)。我不記得每個消費者線程是否會使用它自己的處理器實例,但除非您在處理器中啓動一個新線程,否則您將需要多個線程,因爲在再次調用「from」之前,線路必須完成。 – 2014-10-31 19:04:28

+0

你看過駱駝負載平衡器嗎? http://camel.apache.org/load-balancer.html – Fortyrunner 2014-10-31 21:15:17

+0

@Fortyrunner我認爲負載平衡器將用於出站或端點。在這種情況下,併發消費者會做到這一點。 – 2014-11-01 06:43:59

回答

5

您應該使用concurrentConsumers選項在評論中說,

<route> 
    <from uri="broker:queue:inbox?concurrentConsumers=5" /> 
    <to uri="MyProcessor" /> 
</route> 

通知也有maxConcurrentConsumers您可以設置爲使用最小/最大範圍的併發消費者的,所以駱駝會自動生長/取決於收縮負載。

看到JMS文檔的詳細信息在

+0

有沒有辦法在運行時動態更改'concurrentConsumers'屬性?駱駝提供這種能力?一種增加或減少運行時消費者數量的方法。也許通過JMX或HawtIO mgtm控制檯。 – Tuelho 2015-11-25 18:01:04

+0

我們有什麼方法可以動態改變'concurrentConcumers'嗎? @Tuelho你有解決方案嗎? – Kuldeep 2016-08-05 10:00:00

+1

您可以使用JMX /從hawtio Web控制檯更改它 – 2016-08-07 12:33:29