我想使用簡單的處理器/異步處理器作爲目的地並行處理隊列中的消息。處理器每條消息需要一點時間,但每條消息都可以單獨處理,因此同時處理(在健康邊界內)。與ActiveMQ並行處理多條消息
我很難找到例子,特別是關於駱駝路線的xml配置。
到目前爲止,我已經定義了一個線程池,路線和處理器:
<threadPool id="smallPool" threadName="MyProcessorThread" poolSize="5" maxPoolSize="50" maxQueueSize="100"/>
<route>
<from uri="broker:queue:inbox" />
<threads executorServiceRef="smallPool">
<to uri="MyProcessor" />
</threads>
</route>
<bean id="MyProcessor" class="com.example.java.MyProcessor" />
和我的處理器的樣子:
public class MyProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
Message in = exchange.getIn();
String msg = in.getBody(String.class);
System.out.println(msg);
try {
Thread.sleep(10 * 1000); // Do something in the background
} catch (InterruptedException e) {}
System.out.println("Done!");
}
}
不幸的是,當我發帖的隊列,他們是仍然一一處理,每個延遲10秒(我的「後臺任務」)。
任何人都可以指向正確的方向使用定義的線程池處理消息或解釋我做錯了什麼?
你試過concurrentConsumers?在你的例子中,這將是broker:queue:inbox?concurrentConsumers = 5(或其他)。我不記得每個消費者線程是否會使用它自己的處理器實例,但除非您在處理器中啓動一個新線程,否則您將需要多個線程,因爲在再次調用「from」之前,線路必須完成。 – 2014-10-31 19:04:28
你看過駱駝負載平衡器嗎? http://camel.apache.org/load-balancer.html – Fortyrunner 2014-10-31 21:15:17
@Fortyrunner我認爲負載平衡器將用於出站或端點。在這種情況下,併發消費者會做到這一點。 – 2014-11-01 06:43:59