2012-09-11 89 views
5

我想知道是否有可能與駱駝根據交易內容進行限制。基於內容的節流

情況如下:我必須通過soap調用web服務。其中,發送給該web服務的參數是customerId。問題在於,如果給定customerId的每分鐘請求數超過1個,則web服務會發送一個錯誤。

我想知道是否有可能實現與駱駝每customerId限制。因此,不應該爲所有消息實施限制,而僅限於具有相同customerId的消息。

讓我知道我可以如何實現這一點,或者如果我需要澄清我的問題。

+2

它實際上是一個好主意,在現有的添加對組的支持Throttler EIP。這就是說,儘管這些消息將在等待發布時保存在內存中。讓我爲這個增強記錄一個JIRA。 –

+0

只是爲了記錄在這裏是票:https://issues.apache.org/jira/browse/CAMEL-5599 –

+0

@ClausIbsen - 這是一段時間,但我會拋出支持該功能! – Denise

回答

1

雖然ActiveMQ消息組肯定會解決的唯一客戶ID的並行處理,在我的評估克勞斯是正確的,引入油門每個唯一組表示未實現的功能爲駱駝/ ActiveMQ。

郵件組本身並不符合所述的SLA。雖然每組消息(由顧客ID關聯)將按照每組一個線程的順序進行處理,但只要請求花費的時間少於一分鐘即可接收響應,則不會強制每個客戶每分鐘請求一個請求。這就是說,我很想知道是否可以將消息組和節流策略結合起來,以便在JIRA中模擬功能請求。我迄今爲止的嘗試都失敗了。我在想沿着這些路線的東西:

<route> 
    <from uri="activemq:pending?maxConcurrentConsumers=10"/> 
    <throttle timePeriodMillis="60000"> 
    <constant>1</constant> 
    <to uri="mock:endpoint"/> 
    </throttle> 
</route> 

然而,節流似乎也適用於整組移動到終點的請求,而不是每一個人消費者。我不得不承認,我有點驚訝地發現這種行爲。我的期望是,油門將單獨應用於每個消費者,只要消息在JMSXGroupId標頭中包含客戶ID,就可以滿足原始問題中的SLA。

0

我遇到了類似的問題,最後想出了這裏描述的解決方案。

我的假設是:

  • 消息的順序並不重要(儘管它可以通過重新定序器來解決)
  • 每個客戶的ID信息的總量不是很大,因此運行時沒有飽和。

的溶液的方法:

  • 運行聚合1分鐘,同時使用的customerID組裝以相同的客戶ID的消息到一個列表
  • 使用分配器分裂列表成單個消息
  • 將第一條消息從分離器發送到實際服務
  • 將列表的其餘部分重新路由回聚合器。

的Java DSL版本是有點更容易理解:

final AggregationStrategy aggregationStrategy = AggregationStrategies.flexible(Object.class) 
     .accumulateInCollection(ArrayList.class); 

from("direct:start") 
    .log("Receiving ${body}") 
    .aggregate(header("customerID"), aggregationStrategy).completionTimeout(60000) 
     .log("Aggregate: releasing ${body}") 
     .split(body()) 
     .choice() 
      .when(header(Exchange.SPLIT_INDEX).isEqualTo(0)) 
       .log("*** Processing: ${body}") 
       .to("mock:result") 
      .otherwise() 
       .to("seda:delay") 
     .endChoice(); 

from("seda:delay") 
    .delay(0) 
    .to("direct:start"); 

的Spring XML版本如下所示:

<!-- this is our aggregation strategy defined as a spring bean --> 
<!-- see http://stackoverflow.com/questions/27404726/how-does-one-set-the-pick-expression-for-apache-camels-flexibleaggregationstr --> 
<bean id="_flexible0" class="org.apache.camel.util.toolbox.FlexibleAggregationStrategy"/> 
<bean id="_flexible2" factory-bean="_flexible0" factory-method="accumulateInCollection"> 
    <constructor-arg value="java.util.ArrayList" /> 
</bean> 

<camelContext xmlns="http://camel.apache.org/schema/spring"> 
     <route> 
      <from uri="direct:start"/> 
      <log message="Receiving ${body}"/> 
      <aggregate strategyRef="_flexible2" completionTimeout="60000" > 
       <correlationExpression> 
        <xpath>/order/@customerID</xpath> 
       </correlationExpression> 
       <log message="Aggregate: releasing ${body}"/> 
       <split> 
        <simple>${body}</simple> 
        <choice> 
         <when> 
          <simple>${header.CamelSplitIndex} == 0</simple> 
          <log message="*** Processing: ${body}"/> 
          <to uri="mock:result"/> 
         </when> 
         <otherwise> 
          <log message="--- Delaying: ${body}"/> 
          <to uri="seda:delay" /> 
         </otherwise> 
        </choice> 
       </split> 
      </aggregate> 
     </route> 

     <route> 
      <from uri="seda:delay"/> 
      <to uri="direct:start"/> 
     </route> 
</camelContext>