2016-11-11 73 views
0

我使用的是Apache Camel並獲取一個大文件以進行輸入,我必須逐行處理。內容已經排序,我必須使用相同的關聯關鍵字聚合所有連續的行。如果相關鍵發生變化,則必須完成前一個聚合。如果文件結束,最後的聚合也已完成。 我有一些限制: - 由於傳入文件相當大,我們希望以流式處理它。 - 因爲結果賦予同步端點,所以我不想使用超時完成謂詞。否則,我將失去調節數據源消耗速度的背壓,並且交換將積累在AggregateProcessor的超時映射和聚合存儲庫中。只彙總具有相同關聯密鑰的連續交換

PreCompletionAwareAggregationStrategy看起來像一個有希望的解決方案,但事實證明,最後一個聚合將不會完成,直到下一個文件到達。如果我在preComplete中使用CamelSplitComplete屬性,則最後一個聚合完成,但沒有最後一個傳入交換。相反,這最後的交換將被添加到下一個文件到達的內容。

因此,目前我很迷茫,找不到過分難看的解決方案。

+0

我已經登錄了票,看看是否可以通過OOTB使事情變得更加簡單:https://issues.apache.org/jira/browse/CAMEL-10474 –

+0

您現在可以通過CAMEL-10474來做到這一點。但是我已經記錄了另一張票,以使這更簡單:https://issues.apache.org/jira/browse/CAMEL-12296 –

回答

0

在描述的場景,我會派劈裂消息的路由與聚合器(姑且稱之爲「AggregationRoute」),該公司聚集戰略實現PreCompletionAwareAggregationStrategy(你已經在使用它的方式,我猜)。 然後,當分割結束時,將AGGREGATION_COMPLETE_ALL_GROUPS標頭設置爲true並將其發送到AggregationRoute。此交換僅用作完成所有聚合組的信號。

實施例:


    ... 
    .split(body()).streaming() 
     .to("direct:aggregationRoute") 
    .end() 
    .setHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS,constant(true)) 
    .to("direct:aggregationRoute"); 

from("direct:aggregationRoute") 
    .aggregate([your correlation expression]), myAggregationStrategy) 
    ... 

另一種替代方法是使用AggregateController通過調用其方法forceCompletionOfAllGroups()來結束所有組的聚集:


AggregateController aggregateController = new DefaultAggregateController(); 

from(...) 
    ... 
    .split(body()).streaming() 
     .aggregate([correlation expression], aggregationStrategy).aggregateController(aggregateController) 
      ... 
      // Do what you need to do with the aggregated exchange 
      ... 
     .end() 
    .end() 
    .bean(aggregateController, "forceCompletionOfAllGroups")