0
我有以下途徑:Apache的駱駝匯聚:輸入和輸出
<route id="my-aggregator">
<from uri="direct:aggregator" />
<aggregate strategyRef="myAggregationStrategy" completionSize="3">
<correlationExpression>
<simple>${header.id}</simple>
</correlationExpression>
<to uri="bean:payloadProcessor?method=process" />
</aggregate>
</route>
正如你所看到的,它等待,直到它已收到3間的交流,所有具有相匹配的header.id
值。一旦接收到3個這樣的消息,下面的代碼執行:
public class MyAggregationStrategy implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange e1, Exchange e2) {
// The first (of 3) message will have a `List<Widget>` on its body. Extract it here.
List<Widget> redWidgets = ??? // e1.getIn().getBody()? e2.getIn().getBody()? something else?
// The second (of 3) message will have a `List<Widget>` on its body. Extract it here.
List<Widget> blueWidgets = ??? // e1.getIn().getBody()? e2.getIn().getBody()? something else?
// The third (of 3) message will have a `List<Widget>` on its body. Extract it here.
List<Widget> greenWidgets = ??? // e1.getIn().getBody()? e2.getIn().getBody()? something else?
WidgetPayload payload = new WidgetPayload(redWidgets, blueWidgets, greenWidgets);
// Which "output" exchange do I set payload to, so that it gets routed on to the
// "payloadProcessor" bean?
e1.getOut().setBody(payload); /* or */ e2.getOut().setBody(payload);
// What do we even return?
return e1; /* or return e2; */
}
}
在聚合
所以,我試圖訪問到已通過各種途徑發送到匯聚各List<Widget>
,將它們組合成一個WidgetPayload
實例,然後將WidgetPayload
設置爲出站交換的出站/返回值。但我無法弄清楚所有這些做法的正確方法。具體做法是:
- 如何從
e1
和e2
提取每個所述3List<Widget>
的? e1
和e2
甚至代表什麼?進出?新舊?還有別的嗎?- 使用什麼方法調用
e1
/e2
是否讓轉發到payloadProcessor
bean?
在此先感謝!
您是否驗證過您的聚合器代碼只有在三次交換到達後纔會調用?它應該在每個到達的交換機上被呼叫。請參閱[關於AggregationStrategy](http://camel.apache.org/aggregator2.html)。 – Ralf
謝謝@Ralf(+1) - 所以一些跟進:(1)如果每次交換到達時都會調用此方法,那麼哪個Exchange將包含我的新列表以及哪個Exchange將包含我的WidgetPayload? (2)在所有3列表到達後,如何創建WidgetPayload?再次感謝! –
IAmYourFaja