2013-05-29 50 views
0

我有一個路由設置爲以批處理模式運行,輪詢幾千個XML文件。每個都在XML結構中進行時間戳,並使用此dateTime元素來確定XML是否應包含在批處理的進一步處理中(一種XQuery轉換)。由於這是批量路由,它在執行後自行終止。基於XQuery篩選器設置標題

由於路由需要自行關閉,所以如果每條消息都被過濾掉,我必須確保它也關閉,這就是爲什麼我不使用過濾器,而是使用.choice()語句,並在交換機上設置自定義標頭稍後在一個bean中使用,該bean將匹配併爲XQuery準備單個源文檔。

但是,我目前的做法需要第二條路線,即.choice()的兩個分支轉發到。這是必要的,因爲我似乎無法迫使兩條路徑繼續。所以我的問題是:如何擺脫這第二條路線?一種方法是將過濾器頭部設置爲一個bean,但我擔心所涉及的開銷。我認爲Camel中的XQuery過濾器將大大超過POJO,它能夠從字符串中構建XML文檔並針對它運行XQuery。

from(sourcePath + "?noop=true" + "&include=.*.xml") 
     .choice() 
      .when() 
       .xquery("[XQuery Filter]") 
       .setHeader("Filtered", constant(false)) 
       .to("direct:continue") 
      .otherwise() 
       .setHeader("Filtered", constant(true)) 
       .to("direct:continue") 
.end(); 

from("direct:continue") 
     .routeId(forwarderRouteID) 
     .aggregate(aggregationExpression) 
      .completionFromBatchConsumer() 
      .completionTimeout(DEF_COMPLETION_TIMEOUT) 
      .groupExchanges() 
     .bean(new FastQueryMerger(), "group") 
     .to("xquery:" + xqueryPath) 
     .bean(new FileModifier(interval), "setFileName") 
     .to(targetPath) 
     .process(new Processor() { 
       @Override 
       public void process(Exchange exchange) throws Exception { 
        new RouteTerminator(routeID, exchange.getContext()).start(); 
        new RouteTerminator(forwarderRouteID, exchange.getContext()).start(); 
       } 
      }) 
.end(); 

回答

1

不會.end()幫助嗎? 我指的是以下幾點:

from(sourcePath + "?noop=true" + "&include=.*.xml") 
    .choice() 
     .when() 
      .xquery("[XQuery Filter]") 
      .setHeader("Filtered", constant(false)).end() 
     .otherwise() 
      .setHeader("Filtered", constant(true)).end() 
    .aggregate(aggregationExpression) 
     .completionFromBatchConsumer() 
     .completionTimeout(DEF_COMPLETION_TIMEOUT) 
     .groupExchanges() 
    .bean(new FastQueryMerger(), "group") 
    .to("xquery:" + xqueryPath) 
    .bean(new FileModifier(interval), "setFileName") 
    .to(targetPath) 
    .process(new Processor() { 
      @Override 
      public void process(Exchange exchange) throws Exception { 
       new RouteTerminator(routeID, exchange.getContext()).start(); 
       new RouteTerminator(forwarderRouteID, exchange.getContext()).start(); 
      } 
     }); 

只是快速地測試了以下一個和它的工作:

@Produce(uri = "direct:test") 
protected ProducerTemplate testProducer; 
@EndpointInject(uri = "mock:test-first") 
protected MockEndpoint testFirst; 
@EndpointInject(uri = "mock:test-therest") 
protected MockEndpoint testTheRest; 
@EndpointInject(uri = "mock:test-check") 
protected MockEndpoint testCheck; 

@Test 
public void test() { 
    final String first = "first"; 
    final String second = "second"; 
    testFirst.setExpectedMessageCount(1); 
    testTheRest.setExpectedMessageCount(1); 
    testCheck.setExpectedMessageCount(2); 
    testProducer.sendBody(first); 
    testProducer.sendBody(second); 
    try { 
     testFirst.assertIsSatisfied(); 
     testTheRest.assertIsSatisfied(); 
     testCheck.assertIsSatisfied(); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
} 

@Override 
protected RouteBuilder createRouteBuilder() { 
    return new RouteBuilder() { 
     public void configure() { 
      from("direct:test") 
       .choice() 
        .when(body().isEqualTo("first")).to("mock:test-first") 
        .otherwise().to("mock:test-therest").end() 
        .to("mock:test-check"); 
     } 
    }; 
} 
+0

我花了相當長一段時間,回到這個問題,因爲我剛剛返回到該部分的項目。你給的第二個代碼示例工作完美,正是我所需要的。整個'.choice()'語句末尾的簡單'.end()'終止所有分支。你原來的第一段代碼段有兩個'.end()'語句,它們是不正確的,不能編譯。我相應地修改了它。再次感謝。 – Lilienthal