我想知道是否有可能與駱駝根據交易內容進行限制。基於內容的節流
情況如下:我必須通過soap調用web服務。其中,發送給該web服務的參數是customerId。問題在於,如果給定customerId的每分鐘請求數超過1個,則web服務會發送一個錯誤。
我想知道是否有可能實現與駱駝每customerId限制。因此,不應該爲所有消息實施限制,而僅限於具有相同customerId的消息。
讓我知道我可以如何實現這一點,或者如果我需要澄清我的問題。
我想知道是否有可能與駱駝根據交易內容進行限制。基於內容的節流
情況如下:我必須通過soap調用web服務。其中,發送給該web服務的參數是customerId。問題在於,如果給定customerId的每分鐘請求數超過1個,則web服務會發送一個錯誤。
我想知道是否有可能實現與駱駝每customerId限制。因此,不應該爲所有消息實施限制,而僅限於具有相同customerId的消息。
讓我知道我可以如何實現這一點,或者如果我需要澄清我的問題。
ActiveMQ Message Groups旨在處理這種情況。因此,如果您可以在路由中引入JMS隊列跳轉,那麼只需將JMSXGroupId標頭設置爲customerId即可。然後在另一條路線中,您可以從此隊列中消耗併發送到您的Web服務以獲取您描述的行爲。
也看到http://camel.apache.org/parallel-processing-and-ordering.html更多信息...
雖然ActiveMQ消息組肯定會解決的唯一客戶ID的並行處理,在我的評估克勞斯是正確的,引入油門每個唯一組表示未實現的功能爲駱駝/ ActiveMQ。
郵件組本身並不符合所述的SLA。雖然每組消息(由顧客ID關聯)將按照每組一個線程的順序進行處理,但只要請求花費的時間少於一分鐘即可接收響應,則不會強制每個客戶每分鐘請求一個請求。這就是說,我很想知道是否可以將消息組和節流策略結合起來,以便在JIRA中模擬功能請求。我迄今爲止的嘗試都失敗了。我在想沿着這些路線的東西:
<route>
<from uri="activemq:pending?maxConcurrentConsumers=10"/>
<throttle timePeriodMillis="60000">
<constant>1</constant>
<to uri="mock:endpoint"/>
</throttle>
</route>
然而,節流似乎也適用於整組移動到終點的請求,而不是每一個人消費者。我不得不承認,我有點驚訝地發現這種行爲。我的期望是,油門將單獨應用於每個消費者,只要消息在JMSXGroupId標頭中包含客戶ID,就可以滿足原始問題中的SLA。
我遇到了類似的問題,最後想出了這裏描述的解決方案。
我的假設是:
的溶液的方法:
的Java DSL版本是有點更容易理解:
final AggregationStrategy aggregationStrategy = AggregationStrategies.flexible(Object.class)
.accumulateInCollection(ArrayList.class);
from("direct:start")
.log("Receiving ${body}")
.aggregate(header("customerID"), aggregationStrategy).completionTimeout(60000)
.log("Aggregate: releasing ${body}")
.split(body())
.choice()
.when(header(Exchange.SPLIT_INDEX).isEqualTo(0))
.log("*** Processing: ${body}")
.to("mock:result")
.otherwise()
.to("seda:delay")
.endChoice();
from("seda:delay")
.delay(0)
.to("direct:start");
的Spring XML版本如下所示:
<!-- this is our aggregation strategy defined as a spring bean -->
<!-- see http://stackoverflow.com/questions/27404726/how-does-one-set-the-pick-expression-for-apache-camels-flexibleaggregationstr -->
<bean id="_flexible0" class="org.apache.camel.util.toolbox.FlexibleAggregationStrategy"/>
<bean id="_flexible2" factory-bean="_flexible0" factory-method="accumulateInCollection">
<constructor-arg value="java.util.ArrayList" />
</bean>
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="direct:start"/>
<log message="Receiving ${body}"/>
<aggregate strategyRef="_flexible2" completionTimeout="60000" >
<correlationExpression>
<xpath>/order/@customerID</xpath>
</correlationExpression>
<log message="Aggregate: releasing ${body}"/>
<split>
<simple>${body}</simple>
<choice>
<when>
<simple>${header.CamelSplitIndex} == 0</simple>
<log message="*** Processing: ${body}"/>
<to uri="mock:result"/>
</when>
<otherwise>
<log message="--- Delaying: ${body}"/>
<to uri="seda:delay" />
</otherwise>
</choice>
</split>
</aggregate>
</route>
<route>
<from uri="seda:delay"/>
<to uri="direct:start"/>
</route>
</camelContext>
它實際上是一個好主意,在現有的添加對組的支持Throttler EIP。這就是說,儘管這些消息將在等待發布時保存在內存中。讓我爲這個增強記錄一個JIRA。 –
只是爲了記錄在這裏是票:https://issues.apache.org/jira/browse/CAMEL-5599 –
@ClausIbsen - 這是一段時間,但我會拋出支持該功能! – Denise