2014-02-05 93 views
2

我有以下駱駝路線:駱駝聚合和completionPredicate通過示例

<route id="myRoute"> 
    <from uri="direct:aggregator" /> 

    <aggregate strategy="aggregatorStrategy" completionInterval="60000" completionSize="500"> 
     <correlationExpression> 
      <xpath>/fizz/buzz</xpath> 
     </correlationExpression> 

     <to uri="bean:postProcessor?method=run" /> 
    </aggregator> 
</route> 

正如你可以看到它聚合1分鐘的時間間隔內所述第一500條消息,所述<aggregator/>接收,或所有的消息,和然後將聚合消息發送到名爲postProcessor的bean。

你可以認爲這種聚合邏輯如下:

AGGREGATE UNTIL: 
    We have received 500 messages 
    OR 
    1 minute has elapsed 

THEN: 
    Send to postProcessor 

或者僞代碼:aggregateUntil(weHave500Message() || 1minHasElapsed())。我想將此邏輯更改爲:

AGGREGATE UNTIL: 
    We have received 500 messages 
    OR 
    1 minute has elapsed 
    OR 
    A message is received that has a property called "fireNow" and a value of "true" 

THEN: 
    Send to postProcessor 

或者再次以僞代碼形式:aggregateUntil(weHave500Message() || 1minHasElapsed() || messageHasProperty("fireNow", "true"))

換句話說,聚合直到滿足3個條件中的任何一個。任何想法如何我可以實現這一點?我有一個的感覺我可以用completionPredicateeagerCheckCompletion finagle這個,但沒有看到這裏的樹木穿過樹林。

回答

3

您可以使用completionSizecompletionInterval在一起,並添加completionPredicate方程,像這樣(你需要使用eagerCheckCompletion="true",因爲我們需要測試傳入的消息,而不是聚集的消息):

<route> 
    <from uri="direct:aggregator" /> 
    <aggregate completionSize="500" completionInterval="60000" eagerCheckCompletion="true"> 
     <correlationExpression> 
      <xpath>/fizz/buzz</xpath> 
     </correlationExpression> 
     <completion-predicate> 
      <simple>${property.fireNow} == 'true'</simple> 
     </completion-predicate> 
     <to uri="bean:postProcessor?method=run" /> 
    </aggregate> 
</route> 

另一種方法是創建一個複合謂詞來測試所有三個條件,並僅使用該謂詞作爲completion-predicate,但此解決方案較差,因爲間隔超時是由單獨的線程觸發的,而使用複合謂詞則不存在觸發聚合的線程超時到期。

+0

謝謝@EmirCalabuch(+1) – AdjustingForInflation