2014-07-16 151 views
1

我想寫使用Spring集成V4的DSL的API一個簡單的信息流動,這應該是這樣的:Spring集成4異步請求/響應

 -> in.ch -> Processing -> JmsGatewayOut -> JMS_OUT_QUEUE 
Gateway 
     <- out.ch <- Processing <- JmsGatewayIn <- JMS_IN_QUEUE 

與請求/響應是異步的,當我注入消息通過初始網關傳遞,消息一直傳遞到JMS_OUT_QUEUE。除了這個消息流,回覆消息被放回到JMS_IN_QUEUE中,然後由JmsGatewayIn拾取。此時,消息被處理並放置到out.ch中(我知道響應已經out.ch,因爲我有一個記錄器攔截器,它記錄了放置在那裏的消息),但是網關永遠不會收到響應。

相反的響應的,這從JMS_OUT_QUEUE拾取該消息,並放置在JMS_IN_QUEUE響應此消息流的系統外,自身JmsOutboundgateway接收javax.jms.MessageFormatException: MQJMS1061: Unable to deserialize object(我認爲這是不能從反序列化JMS回覆對象看着日誌)。

我很明顯沒有正確配置某些東西,但我不知道具體是什麼。有誰知道我錯過了什麼?使用spring-integration-core-4.0.3.RELEASE,spring-integration-jms-4.0.3.RELEASE,spring-integration-java-dsl-1.0.0.M2,spring-jms-4.0。使用Spring-Integration-Core-4.0.3.RELEASE,使用Spring-Integration-Core-4.0.3.RELEASE。 6.RELEASE。

我的網關配置如下:

@MessagingGateway 
public interface WsGateway { 

    @Gateway(requestChannel = "in.ch", replyChannel = "out.ch", 
     replyTimeout = 45000) 
    AResponse process(ARequest request); 
} 

我的集成流程配置如下:

@Configuration 
@EnableIntegration 
@IntegrationComponentScan 
@ComponentScan 
public class IntegrationConfig { 

    @Bean(name = "in.ch") 
    public DirectChannel inCh() { 
     return new DirectChannel(); 
    } 

    @Bean(name = "out.ch") 
    public DirectChannel outCh() { 
     return new DirectChannel(); 
    } 

    @Autowired 
    private MQQueueConnectionFactory mqConnectionFactory; 

    @Bean 
    public IntegrationFlow requestFlow() { 

     return IntegrationFlows.from("in.ch") 
       .handle("processor", "processARequest") 
       .handle(Jms.outboundGateway(mqConnectionFactory) 
         .requestDestination("JMS_OUT_QUEUE") 
         .correlationKey("JMSCorrelationID") 
       .get(); 
    } 

    @Bean 
    public IntegrationFlow responseFlow() { 

     return IntegrationFlows.from(Jms.inboundGateway(mqConnectionFactory) 
       .destination("JMS_IN_QUEUE")) 
       .handle("processor", "processAResponse") 
       .channel("out.ch") 
       .get(); 
    } 
} 

感謝有這方面的幫助, PM。

回答

2

首先配置的是壞:

  1. 自您開始WsGateway#process流你真的應該等待答覆那裏。 網關的請求/回覆功能基於TemporaryReplyChannel,它被放置爲headers作爲不可序列化的值。

  2. 只要您等待依賴該網關,實際上沒有理由提供replyChannel,如果您不打算在回覆中執行一些發佈 - 訂閱邏輯。

  3. 當您向JMS隊列發送消息時,您應該瞭解消費者部分可能是單獨的遠程應用程序。最後一個人可能對你的out.ch一無所知。

  4. JMS請求/回覆功能的確基於JMSCorrelationID,但還不夠。這裏還有一件事是一個ReplyTo JMS頭。因此,如果你打算髮送消費者的回覆,你應該只依靠JmsGatewayIn的東西。

所以我想你的代碼改成這樣:

@MessagingGateway 
public interface WsGateway { 

    @Gateway(requestChannel = "in.ch", replyTimeout = 45000) 
    AResponse process(ARequest request); 
} 

@Configuration 
@EnableIntegration 
@IntegrationComponentScan 
@ComponentScan 
public class IntegrationConfig { 

    @Bean(name = "in.ch") 
    public DirectChannel inCh() { 
     return new DirectChannel(); 
    } 

    @Autowired 
    private MQQueueConnectionFactory mqConnectionFactory; 

    @Bean 
    public IntegrationFlow requestFlow() { 
     return IntegrationFlows.from("in.ch") 
       .handle("processor", "processARequest") 
       .handle(Jms.outboundGateway(mqConnectionFactory) 
         .requestDestination("JMS_OUT_QUEUE") 
         .replyDestination("JMS_IN_QUEUE")) 
       .handle("processor", "processAResponse") 
       .get(); 
    } 

} 

讓我知道,如果它是適合你或試圖explian爲什麼你使用two-way網關的一個one-way情況。也許Jms.outboundAdapter()Jms.inboundAdapter()對你更好?

UPDATE

如何使用<header-channels-to-string>從Java DSL:

.enrichHeaders(e -> e.headerChannelsToString()) 
+0

嗨,你的解決方案是正確的,謝謝。我第一次嘗試用Jms輸入/輸出適配器替換Jms輸入/輸出網關,但也有配置問題。然後我試着只用JMS Out Gateway和replyDestination,就像在你的解決方案中那樣工作正常。我沒有刪除最初的@ Gateway的replyChannel,正如你所提到的那樣,這是不需要的。謝謝! –

+0

你好,我發現自己需要回到這篇文章......我有一個需求,我相信我確實需要使用單獨的JMS入站/出站適配器來允許異步響應。爲了做到這一點,我再次在WsGateway上設置了replyChannel =「out.ch」,將Jms.outboundGateway切換爲Jms.outboundAdapter和Jms.inboundAdapter,並且在發送之前在消息上設置了JMSCorrelationID標頭,但是,當響應返回並放入out.ch時,WsGateway不會收到響應消息?我的SI流程實際上比這裏顯示的要複雜一些... –

+1

在發送請求之前,嘗試使用''和'' –