0

目前,我有以下駱駝路線:駱駝AggregationStrategy生產NULL濃郁的消息

<camelContext id="my-camel-context" xmlns="http://camel.apache.org/schema/spring"> 
    <propertyPlaceholder id="envProps" location="classpath:myapp.properties" /> 
    <route id="my-camel-route"> 
     <from uri="{{start.uri}}"/> 

     <setHeader headerName="id"> 
      <constant>1</constant> 
     </setHeader> 

     <to uri="bean:preProcessor?method=process" /> 

     <aggregate strategyRef="myAggregationStrategy" completionSize="1"> 
      <correlationExpression> 
       <simple>${header.id} == 1</simple> 
      </correlationExpression> 
      <to uri="bean:postProcessor?method=process" /> 
     </aggregate> 

     <to uri="bean:mailer?method=process" /> 
    </route> 
</camelContext> 

<bean id="myAggregationStrategy" class="com.me.myapp.MyAggregationStrategy" /> 
<bean id="postProcessor" class="com.me.myapp.PostProcessor" /> 
<bean id="mailer" class="com.me.myapp.Mailer" /> 

現在,我真的不聚集任何有意義的東西(completionSize=1),我真的只是測試AggregationStrategy出來。這裏是我的策略:

public class MyAggregationStrategy implements AggregationStrategy { 
    @Override 
    public Exchange aggregate(Exchange aggregatingExchange, Exchange incomingExchange) { 
     AppPayload payload = null; 

     if(aggregatingExchange == null) 
      payload = new AppPayload(); // This should prevent it from being NULL below in PostProcessor... 
     else 
      payload = (AppPayload)incomingExchange.getIn().getBody(); 

     payload.setCargo((Order)incomingExchange.getIn().getBody()); 

     if(aggregatingExchange == null) { 
      incomingExchange.getIn().setBody(payload); 
      return incomingExchange; 
     } 
     else 
      return aggregatingExchange; 
    } 
} 

,也是我postProcessor豆:

public class PostProcessor implement Processor { 
    @Override 
    public void process(Exchange exchange) { 
     try { 
      System.out.println("In PostProcessor..."); 
      AppPayload payload = (AppPayload)exchange.getIn().getBody(); 
      System.out.println("\t...payload acquired..."); 

      if(payload == null) 
       System.out.println("Payload is NULL."); 
     } catch(Throwable throwable) { 
      System.out.println(ExceptionUtils.getFullStackTrace(throwable)); 
     } 
    } 
} 

當我運行這段代碼,我看到日誌從我preProcessor的bean inidcate它是正確執行的消息。而且我還看到,MyAggregationStrategy正確地「聚合」該消息,然後在第一條消息到達之後讓它傳遞到postProcessor(同樣,因爲completionSize=1)。不過,我得到以下輸出postProcessor

In PostProcessor... 
    ...payload acquired... 
Payload is NULL. 

有人能看到爲什麼​​是NULL?不應該在MyAggregationStrategy內初始化?!?我很高興發佈更多代碼,但我相信這是源於我錯誤地使用了AggregationStrategy API。

回答

1

我相信你會對aggregatingExchangeincomingExchange感到困惑。你可以試試這個:

public class MyAggregationStrategy implements AggregationStrategy { 
    @Override 
    public Exchange aggregate(Exchange aggregatingExchange, Exchange incomingExchange) { 
     AppPayload payload = null; 

     if(aggregatingExchange == null) { 
     payload = new AppPayload(); // This should prevent it from being NULL below in PostProcessor... 
     } else { 
      payload = (AppPayload)aggregatingExchange.getIn().getBody(); 
     } 

     payload.setCargo((Order)incomingExchange.getIn().getBody()); 

     if(aggregatingExchange == null) { 
      incomingExchange.getIn().setBody(payload); 
      return incomingExchange; 
     } else { 
      return aggregatingExchange; 
     } 
    } 
} 
+0

釘它 - 感謝! – AdjustingForInflation

0

添加到@ hveiga已經提到的。 我有一個類似的問題,我通過向我的消息添加標題來解決問題。 但在你的情況下,我看到你沒有使用分離器,並且你已經定義了一個頭。因此,從Clauss Ibssen得到的一條信息是第一次交換將是空的,我們需要檢查空對象。

更多解釋見本 - Apache Camel - Split and aggregate - Old Exchange is always null

軌道這裏的完整的解釋 - http://camel.465427.n5.nabble.com/Split-and-Aggregate-Old-Exchange-is-null-everytime-in-AggregationStrategy-td5746365.html