2013-12-19 26 views
0

我有以下途徑:Apache的駱駝匯聚:輸入和輸出

<route id="my-aggregator"> 
    <from uri="direct:aggregator" /> 

    <aggregate strategyRef="myAggregationStrategy" completionSize="3"> 
     <correlationExpression> 
      <simple>${header.id}</simple> 
     </correlationExpression> 
     <to uri="bean:payloadProcessor?method=process" /> 
    </aggregate> 
</route> 

正如你所看到的,它等待,直到它已收到3間的交流,所有具有相匹配的header.id值。一旦接收到3個這樣的消息,下面的代碼執行:

public class MyAggregationStrategy implements AggregationStrategy { 
    @Override 
    public Exchange aggregate(Exchange e1, Exchange e2) { 
     // The first (of 3) message will have a `List<Widget>` on its body. Extract it here. 
     List<Widget> redWidgets = ??? // e1.getIn().getBody()? e2.getIn().getBody()? something else? 

     // The second (of 3) message will have a `List<Widget>` on its body. Extract it here. 
     List<Widget> blueWidgets = ??? // e1.getIn().getBody()? e2.getIn().getBody()? something else? 

     // The third (of 3) message will have a `List<Widget>` on its body. Extract it here. 
     List<Widget> greenWidgets = ??? // e1.getIn().getBody()? e2.getIn().getBody()? something else? 

     WidgetPayload payload = new WidgetPayload(redWidgets, blueWidgets, greenWidgets); 

     // Which "output" exchange do I set payload to, so that it gets routed on to the 
     // "payloadProcessor" bean? 
     e1.getOut().setBody(payload); /* or */ e2.getOut().setBody(payload); 

     // What do we even return? 
     return e1; /* or return e2; */ 
    } 
} 
在聚合

所以,我試圖訪問到已通過各種途徑發送到匯聚各List<Widget>,將它們組合成一個WidgetPayload實例,然後將WidgetPayload設置爲出站交換的出站/返回值。但我無法弄清楚所有這些做法的正確方法。具體做法是:

  1. 如何從e1e2提取每個所述3 List<Widget>的?
  2. e1e2甚至代表什麼?進出?新舊?還有別的嗎?
  3. 使用什麼方法調用e1/e2是否讓​​轉發到payloadProcessor bean?

在此先感謝!

+1

您是否驗證過您的聚合器代碼只有在三次交換到達後纔會調用?它應該在每個到達的交換機上被呼叫。請參閱[關於AggregationStrategy](http://camel.apache.org/aggregator2.html)。 – Ralf

+0

謝謝@Ralf(+1) - 所以一些跟進:(1)如果每次交換到達時都會調用此方法,那麼哪個Exchange將包含我的新列表以及哪個Exchange將包含我的WidgetPayload? (2)在所有3列表到達後,如何創建WidgetPayload?再次感謝! – IAmYourFaja

回答

2

看看我提到的代碼示例頁面;第一個參數是彙總交換。它在第一次調用聚合時爲空。您必須在第一次調用時創建WidgetPayload,然後在每次次後調用的過程中繼續添加小部件。然後,當您在聚合器之後到達路由步驟時,交換機的主體就是您的聚合WidgetPayload。