2013-11-14 118 views
3

我花了差不多兩天的時間去了解apache駱駝中concurrentConsumers vs線程的概念。但我真的不明白這個概念。任何人都可以幫助我理解這個概念。我正在使用駱駝2.12.0。Apache Camel concurrentConsumers vs threads

from("jms:queue:start?concureentConsumers=5") 
    .threads(3, 3, "replyThread") 
    .bean(new SomeBean()); 

    pom.xml 
    ========================================== 
    <properties> 
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
    <camel.version>2.12.0</camel.version> 
</properties> 

<dependencies> 
    <dependency> 
     <groupId>junit</groupId> 
     <artifactId>junit</artifactId> 
     <version>4.8.1</version> 
     <scope>test</scope> 
    </dependency> 

    <!-- required by both client and server --> 
    <dependency> 
     <groupId>org.apache.camel</groupId> 
     <artifactId>camel-core</artifactId> 
     <version>${camel.version}</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.camel</groupId> 
     <artifactId>camel-spring</artifactId> 
     <version>${camel.version}</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.camel</groupId> 
     <artifactId>camel-jms</artifactId> 
     <version>${camel.version}</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.camel</groupId> 
     <artifactId>camel-test</artifactId> 
     <version>${camel.version}</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.camel</groupId> 
     <artifactId>camel-activemq</artifactId> 
     <version>1.1.0</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.camel</groupId> 
     <artifactId>camel-test-spring</artifactId> 
     <version>2.12.1</version> 
    </dependency> 
</dependencies> 


    //posting to queue 
    for (int i = 0; i < 10; i++) { 
     System.out.println(Thread.currentThread() + " sending request..." + i); 
     template.asyncCallbackRequestBody("jms:queue:start", (body + " " + i), callback); 
    } 

    //my call back 
    public class MyCallback extends SynchronizationAdapter { 

    @Override 
    public void onComplete(Exchange exchange) { 
     String body = exchange.getOut().getBody(String.class); 
     System.out.println(Thread.currentThread() + " Callback Resposne..." +body); 
} 
} 

    public static class SomeBean { 
    public void someMethod(Exchange body) { 
     System.out.println(Thread.currentThread() + " Received: " + body.getIn().getBody()); 
     body.getOut().setBody(body.getIn().getBody()); 
    } 
} 

我的理解(來自apache駱駝文檔)是否有5個競爭消費者對jms:啓動隊列消費消息。和3「replyThreads」處理來自jms:start隊列的異步回覆。但實際輸出是不同的。

 Thread[main,5,main] sending request...0 
    Thread[main,5,main] sending request...1 
    Thread[Camel (camel-1) thread #9 - replyThread,5,main] Received: Hello Camel 0 
    Thread[Camel (camel-1) thread #10 - replyThread,5,main] Received: Hello Camel 1 
    Thread[Camel (camel-1) thread #5 - ProducerTemplate,5,main] Callback Resposne...Hello Camel 0 
    Thread[Camel (camel-1) thread #6 - ProducerTemplate,5,main] Callback Resposne...Hello Camel 1 

回答

0

如果您要檢查的JMS線程池,你需要改變這樣的

from("jms:queue:start?concurrentConsumers=5") 
    .bean(new SomeBean()) 
    .threads(3, 3, "replyThread") 
    .bean(new SomeBean()); 

SomeBean路線可以告訴你,不同的線程池在駱駝航線使用。

+0

你並不真正需要的是第二個線程池,只需使用線程池從JMS那效果很好。 –

+0

啊,只要再看下去。它從駝峯2.10.3開始說你不再需要線程()。 –

3

JMS組件具有內置的線程池,它根據消息隊列的數量上下擴展和縮減。

所以只需要使用

from("jms:queue:start?concurrentConsumers=5") 
    .bean(new SomeBean()); 

你也可以指定一個最大所以有一系列

from("jms:queue:start?concurrentConsumers=5&maxConcurrentConsumers=10") 
    .bean(new SomeBean()); 
+0

感謝您的快速回復。當我在http://camel.apache.org/jms.html @ Section「通過JMS請求回覆」 閱讀了這篇文檔時,我很困惑,它提到了.threads(5)需要定義我們是否想要處理Async回覆。 – Ramki