2014-02-10 30 views
0

我不知道是否有人可以闡明爲什麼GroupedExchangeAggregationStrategy沒有做我希望它應該做的事情。Camel Aggregator誤解

我有一個消息的列表,可以正確地分割並傳遞給我的路由進程。從這條路線上,聚合者應該收集所有的答案,並將它們合併在一個很好的最終交換中並返回。

但是我看到的是,每個拆分的消息都得到正確執行。但聚合器似乎並沒有把它們放在一起。相反,父路由以原始消息結束。這是我會用默認的Camel DefaultAggregationCollection和UseLatestAggregationStrategy看到的行爲。但是,如果我將策略更改爲GroupedExchangeAggregationStrategy,我不應該看到其他內容嗎?

似乎聚合器內的路由根本沒有執行。在路線的起點和終點之後,我有一條評論,但它們從不打印。

這裏是我的分裂:

   <split> 
        <simple>${body}</simple> 
        <to uri="direct:splitprocess"/> 
       </split> 
       <aggregate parallelProcessing="true" strategyRef="productAgrregator" > 
        <correlationExpression><simple>${headers.correlationId}</simple></correlationExpression> 
        <completionSize> 
         <simple>${headers.resultSize}</simple> 
        </completionSize> 
        <to uri="direct:aggregated"/> 
       </aggregate> 

這裏是路線直接:聚合:

 <route id="aggregated"> 
      <from uri="direct:aggregated"/> 
      <log message="Starting aggregation with message: ${body}"/> 
      <removeHeaders pattern="Exchange.CONTENT_ENCODING"/> 
      <transform> 
       <simple>${property.CamelGroupedExchange}</simple> 
      </transform> 
      <log message="Aggregated message: ${body}"/> 
     </route> 

而且這裏是我定義的策略:

<bean id="productAgrregator" class="org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy"/> 

另一件事令我困惑的是,我指定了parallelProcessing =「true」,但在日誌中,我仍然看到以下語句:D一個順序處理3次交換。這不應該是平行處理嗎?

而不是使用GroupedExchangeAggregationStrategy我也試着簡單地指定groupExchanges =「true」。它沒有任何幫助。

正如你所看到的,我很困惑。

下面是一個完全隔離的例子。可以使用提供JSON正文的POST請求調用它,例如:{「zero」:「」,「one」:「」}。我的期望是收到類似於: {one = 1A,zero = 0A}或{one = 0A,zero = 1A}。相反,我得到{zero =,one =}。

 <route id="split"> 
      <from uri="restlet:/split?restletMethod=POST"/> 
      <unmarshal ref="json"/> 
      <setHeader headerName="bodySize"><simple>${body.size()}</simple></setHeader> 
      <setHeader headerName="correlationId"><constant>'12345'</constant></setHeader> 
      <split> 
       <simple>${body}</simple> 
       <to uri="direct:split-process"/> 
       <aggregate parallelProcessing="true" groupExchanges="true"> 
        <camel:correlationExpression><simple>${headers.correlationId}</simple></camel:correlationExpression> 
        <camel:completionSize><simple>${headers.bodySize}</simple></camel:completionSize> 
        <camel:to uri="direct:split-aggregate"/> 
       </aggregate> 
      </split> 
     </route> 


     <route id="split-process"> 
      <from uri="direct:split-process"/> 
      <transform> 
       <simple>${body} ${headers.CamelSplitIndex}A</simple> 
      </transform> 
     </route>    

     <route id="split-aggregate"> 
      <from uri="direct:split-aggregate"/> 
      <to uri="mock:results"/> 
     </route>    
+0

不工作。駱駝實際上抱怨:無法創建路由,因爲定義沒有子集合[$ {headers.correlationId} - > []] – Klaus

+0

對不起,我的錯誤。刪除無益的評論:) – vikingsteve

+0

...和聚合超出分裂,這也正確嗎?我在這裏有一些分割和聚合的例子,但它們在java dsl中,我不太熟悉xml語法:) – vikingsteve

回答

2

這裏是我結束了:

  1. 無需使用額外的聚合。如文檔(http://camel.apache.org/splitter.html#Splitter-WhattheSplitterreturns)中所述,Split具有內置聚合器。我引用:「這個Splitter可以被看作是建立在輕量級聚合器上。」

  2. 實施自定義策略來組合交易所。 GroupedExchangeAggregationStrategy似乎只返回一條消息。我假設最後一個處理過的。用戶定製的策略是從逐字複製StringAggregationStrategy:http://camel.apache.org/aggregator2.html

那麼,到底我的路線看起來如此簡單:

 <route id="split"> 
      <from uri="restlet:/split?restletMethod=POST"/> 
      <split parallelProcessing="true" strategyRef="strategy"> 
       <tokenize token=","/> 
       <transform> 
        <simple>${body} ${headers.CamelSplitIndex}CD</simple> 
       </transform> 
      </split> 
     </route> 
+0

不錯,謝謝分享:) – vikingsteve

1

我會冒險在這裏回答。

由於您的<aggregate...>不屬於您的<split>,因此只有1條消息被髮送到聚合器,因此完成限制從未達到。

 <split> 
      <simple>${body}</simple> 
      <to uri="direct:splitprocess"/> 
      <aggregate parallelProcessing="true" strategyRef="productAgrregator" > 
       <correlationExpression><simple>${headers.correlationId}</simple></correlationExpression> 
       <completionSize> 
        <simple>${headers.resultSize}</simple> 
       </completionSize> 
       <to uri="direct:aggregated"/> 
      </aggregate> 
     </split> 

手指交叉,這將工作...;)

+0

現在,我至少在聚合器中看到日誌消息,看起來聚合是發生。但是,整個父路由的結果仍然不是聚合消息。 – Klaus