2014-05-12 231 views
0

我們目前的情況。我使用Apache Camel,將大文件分割成小的交換文件(使用分割器,見下文)並驗證它們。然後我需要聚合消息,但我使用聚合器,它需要安裝壓縮大小或另一個。我可以在不設置限制的情況下加大當前文檔中的所有交流?如何無限制彙總交易

我的路線:

from("file:data?noop=true?move={{package.success}}&moveFailed={{package.failed}}") 
       .transacted() 
       .split(ExpressionBuilder.beanExpression(new InvoiceIteratorFactory(), "createIterator")) 
       .streaming() 
       .process(new ValidatorProcessor()) 
       .choice() 
       .when(new Predicate() { 
        @Override 
        public boolean matches(Exchange exchange) { 
         return exchange.getContext().getProperty(ValidatorProcessor.STATE_PROPERTY).equals(ValidatorProcessor.STATE_SUCCESS); 
        } 
       }) 
       .to("jpa:/...") 
       .otherwise() 
       .aggregate(body(String.class), new MyAggregationStrategy()).completionSize(????) 
       .to("smtps://smtp.gmail.com?username={{remote.e-mail}}&password={{remote.password}}"); 

要設置匯聚我用它來設置交換機或時間的計數,但我不知道交流將有多少是。

+0

我建議你放慢速度,寫一個更長的問題,說明你想做什麼以及爲什麼。 – inf3rno

回答

1

因此,在Camel中,拆分器EIP每次完成拆分交換時都會生成一個名爲CamelSplitComplete的標題。這個頭是一個布爾值。

我會做的是在聚合,而不是completionSize()使用completionPredicate()。因此,只要標題爲真,它將完成彙總:

from("file:data?noop=true?move={{package.success}}&moveFailed={{package.failed}}") 
    .transacted() 
    .split(ExpressionBuilder.beanExpression(new InvoiceIteratorFactory(), "createIterator")) 
    .streaming() 
    .process(new ValidatorProcessor()) 
    .choice() 
    .when(new Predicate() { 
       @Override 
       public boolean matches(Exchange exchange) { 
        return exchange.getContext().getProperty(ValidatorProcessor.STATE_PROPERTY).equals(ValidatorProcessor.STATE_SUCCESS); 
       } 
      }) 
    .to("jpa:/...") 
    .otherwise() 
    .aggregate(body(String.class), new MyAggregationStrategy()).completionPredicate(header("CamelSplitComplete") == true) 
    .to("smtps://smtp.gmail.com?username={{remote.e-mail}}&password={{remote.password}}"); 

我希望這是您正在尋找的。

+0

謝謝,我會試試看。但我還有一個問題。正如您在路由中看到的,應用程序讀取文件,保存到解析良好的DB實體,並且如果出現錯誤,它會將這些錯誤聚合到消息中,然後將其發送到電子郵件。我有這樣的問題,如何回滾所有那些在解析當前文件時提交給數據庫的實體? –