2012-10-13 154 views
3

我有一個隊列,其中多個消息,都是獨立的,可以並行處理。駱駝JMS組件和並行處理

我已經看到了mutlicast路由器,該路由器採用相同的消息,並在多個接收器拆分它,我想我也嘗試了一些其他類似的油門飛行路線政策,線程池的個人資料等

然而,是什麼我確實想要一條路由來封裝多個並行的JMS會話,這是我希望配置並且一次處理所有消息的總數。

我想說一下我的一個假設,即一條具有from的路線意味着一個會話而不是n個並行會話。如果我錯了,請糾正我。

我的駱駝背景下看起來有點像這樣:

<bean id="throttlePolicy" class="org.apache.camel.impl.ThrottlingInflightRoutePolicy"> 
    <!-- define the scope to be context scoped so we measure against total 
     inflight exchanges that means for both route1, route2 and route3 all together --> 
    <property name="scope" value="Context" /> 
    <!-- when we hit > 20 inflight exchanges then kick in and suspend the routes --> 
    <property name="maxInflightExchanges" value="20" /> 
    <!-- when we hit lower than 10% of the max = 2 then kick in and resume 
     the routes the default percentage is 70% but in this demo we want a low value --> 
    <property name="resumePercentOfMax" value="10" /> 
    <!-- output throttling activity at WARN level --> 
    <property name="loggingLevel" value="WARN" /> 
</bean> 

<camelContext id="camel" errorHandlerRef="dlc" 
    xmlns="http://camel.apache.org/schema/spring"> 

    <!-- this is the Dead Letter Channel error handler, where we send failed 
     message to a log endpoint --> 
    <errorHandler id="dlc" type="DeadLetterChannel" 
     deadLetterUri="jms:deadLetterQueue"> 
     <redeliveryPolicy retryAttemptedLogLevel="INFO" 
      maximumRedeliveries="3" redeliveryDelay="250" backOffMultiplier="2" 
      useExponentialBackOff="true" /> 
    </errorHandler> 

    <route routePolicyRef="throttlePolicy"> 
     <from uri="jms:camel.design.md5InputQueue" /> 
     <transacted ref="required" /> 
     <process ref="basicProcessor" /> 
     <to uri="jms:camel.integrationTest.reply" /> 
    </route> 
</camelContext> 

正如你所看到的,我在做什麼是計算源的MD5。我希望能夠做到這一點,並將結果傳輸到回覆隊列中,並將所有這些並行處理。

爲了模擬這個,我在基本處理器中放了一個睡眠(一秒鐘),我所看到的是一個接一個的順序處理消息而不是並行處理。例如,如果有10條消息需要10秒,如果有20條消息,則需要20秒等等。

如何獲得並行工作並具有所有MD5計算,例如完成10條輸入消息即使在處理器中放置睡眠條件後,也可以在約2秒鐘內完成。

回答

2

只需設置maxConcurrentConsumers屬性從您的隊列啓用多個消費者線程...

<route routePolicyRef="throttlePolicy"> 
    <from uri="jms:camel.design.md5InputQueue?maxConcurrentConsumers=5" /> 
    <transacted ref="required" /> 
    <process ref="basicProcessor" /> 
    <to uri="jms:camel.integrationTest.reply" /> 
</route> 
+1

感謝Boday,這沒有工作和一個小的調整也讓我更清楚,或者我真正想要的。我設置了併發消費者,這就是我一直在尋找的東西。 – josecjacob