2016-11-21 26 views
0

我試圖測試一些與彈簧集成使用DSL的東西。這僅僅是一個測試到目前爲止,流程簡單:錯誤'是一個單向的'MessageHandler'彈簧整合聚合器DSL

  • 並聯
  • 創建一些消息
  • 過程(登錄)他們它們聚集
  • 日誌聚集

除了來自聚合器,它工作正常:

@Bean 
public IntegrationFlow integrationFlow() { 
    return IntegrationFlows 
      .from(integerMessageSource(), c -> c.poller(Pollers.fixedRate(1, TimeUnit.SECONDS))) 
      .channel(MessageChannels.executor(Executors.newCachedThreadPool())) 
      .handle((GenericHandler<Integer>) (payload, headers) -> { 
       System.out.println("\t delaying message:" + payload + " on thread " 
         + Thread.currentThread().getName()); 
       try { 
        Thread.sleep(2000); 
       } catch (InterruptedException e) { 
        System.err.println(e.getMessage()); 
       } 
       return payload; 
      }) 
      .handle(this::logMessage) 
      .aggregate(a -> 
        a.releaseStrategy(g -> g.size()>10) 
        .outputProcessor(g -> 
          g.getMessages() 
            .stream() 
            .map(e -> e.getPayload().toString()) 
            .collect(Collectors.joining(","))) 

        ) 
      .handle(this::logMessage) 
      .get(); 

} 

如果我忽略.aggregate(..)部分,則示例正在工作。

枝條的聚合,我得到以下異常:

Caused by: org.springframework.beans.factory.BeanCreationException: The 'currentComponent' (org.faboo.test.ParallelIntegrationApplication$$Lambda$9/[email protected]) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow. 

據我瞭解,它抱怨沒有從聚合輸出?

完整的源代碼可以在這裏找到:hithub

回答

2

問題是聚合前handle() - 它不會產生任何結果所以沒什麼聚集......

 .handle(this::logMessage) 
     .aggregate(a -> 

想必logMessage(Message<?>)void返回類型。

如果您希望在聚合器之前登錄,請使用wireTap或更改logMessage以在記錄後返回Message<?>

 .wireTap(sf -> sf.handle(this::logMessage)) 
+0

謝謝,wireTap做到了。我試圖改變logMessage()來返回消息,但這並沒有改變行爲。 – bert

+0

從1.2版本開始,就有一個'.log()'操作符。請閱讀關於此事的博客文章:https://spring.io/blog/2016/10/14/java-dsl-for-spring-integration-1-2-release-is-available –