在描述的場景,我會派劈裂消息的路由與聚合器(姑且稱之爲「AggregationRoute」),該公司聚集戰略實現PreCompletionAwareAggregationStrategy(你已經在使用它的方式,我猜)。 然後,當分割結束時,將AGGREGATION_COMPLETE_ALL_GROUPS標頭設置爲true並將其發送到AggregationRoute。此交換僅用作完成所有聚合組的信號。
實施例:
...
.split(body()).streaming()
.to("direct:aggregationRoute")
.end()
.setHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS,constant(true))
.to("direct:aggregationRoute");
from("direct:aggregationRoute")
.aggregate([your correlation expression]), myAggregationStrategy)
...
另一種替代方法是使用AggregateController通過調用其方法forceCompletionOfAllGroups()來結束所有組的聚集:
AggregateController aggregateController = new DefaultAggregateController();
from(...)
...
.split(body()).streaming()
.aggregate([correlation expression], aggregationStrategy).aggregateController(aggregateController)
...
// Do what you need to do with the aggregated exchange
...
.end()
.end()
.bean(aggregateController, "forceCompletionOfAllGroups")
我已經登錄了票,看看是否可以通過OOTB使事情變得更加簡單:https://issues.apache.org/jira/browse/CAMEL-10474 –
您現在可以通過CAMEL-10474來做到這一點。但是我已經記錄了另一張票,以使這更簡單:https://issues.apache.org/jira/browse/CAMEL-12296 –