2017-06-14 149 views
1

我試圖實現一個駱駝路由,它從遠程系統隊列(System.A.out)讀取請求消息該路由查看消息正文並將其動態路由到另一個系統隊列(System.B.in)此路由然後完成,並等待隊列中的下一條消息(當前它阻塞並等待臨時隊列上的響應)駱駝JMS異步請求回覆

System.B在隊列中讀取它( System.B.in,並不總是駱駝路由)處理該消息並在其出隊列(System.B.out)上丟棄響應 System.B使用來自請求消息的JMSMessageID作爲JMSCorrelationID在其響應,這是它從請求保持。

駱駝路由(類似於System.A.out,但監聽System.B.out)接收響應消息並使用JMSCorrelationID(該請求不會有JMSCorrelationID,因此將被路由消息主體)找到請求的JMSReplyTo隊列(System.A.in)並將響應放在System.A隊列中的System.A上進行處理。

我使用SpringBoot和駱駝2.18.3,消息隊列爲IMB MQ版本8

我的路線是這樣的:

@Override 
public void configure() throws Exception { 

    //@formatter:off 
    Predicate validRoute = header("route-valid").isEqualTo(true); 
    Predicate inValidRoute = header("route-valid").isEqualTo(false); 
    Predicate splitRoute = header("route-split").isEqualTo(true); 
    Predicate singleRoute = header("route-split").isEqualTo(false); 
    Predicate validSplitRoute = PredicateBuilder.and(validRoute, splitRoute); 
    Predicate validSingelRoute = PredicateBuilder.and(validRoute, singleRoute); 

    from(endpoint(incomingURI)).routeId(routeId) 
     .process(exchange -> { 
       exchange.getIn().setHeader("route-source", format("%s-%s", incomingURI, routeId)); 
      }) 
      .to(endpoint(format("bean:evaluateIncomingMessageService?method=routeMessage(*, %s)", replyToURI))) 
      .choice() 
       .when(validSingelRoute) 
        .log(DEBUG, "Creating a Single route") 
        .to(endpoint("bean:messageCoalitionService?method=saveInstruction(*)")) 
        .setExchangePattern(ExchangePattern.InOut) 
        .toD("${header.route-recipients}") 
       .when(inValidRoute) 
        .log(DEBUG, "a.b.test", format("Incoming message [%s] failed evaluation: %s", incomingURI, body())) 
        .to(endpoint(deadLetterURI)) 
        .routeId(format("%s-%s", incomingURI, routeId)) 
       .when(validSplitRoute) 
        .log(DEBUG, "Creating a Split route") 
        .to(endpoint("bean:messageCoalitionService?method=saveInstructions(*)")) 
        .setExchangePattern(ExchangePattern.InOut) 
        .multicast() 
        .toD("${header.route-recipients}").endChoice() 
       .otherwise() 
        .log(DEBUG, "a.b.test", format("Incoming message [%s] failed evaluation: %s", incomingURI, body())) 
        .to(endpoint(deadLetterURI)) 
        .routeId(format("%s-%s", incomingURI, routeId)); 

春天豆evaluateIncomingMessageService決定,如果消息是請求(無相關ID)或響應並設置請求的路由標頭。我希望Camel能夠自動將響應路由到Request.JMSReplyTo隊列,如果不是這樣做的話,怎麼辦?

replyToURI在Camel Route構建器中配置,如果路由在System.A.out上偵聽,其replyToURI將始終爲System.A.in。

evaluateIncomingMessageService.routeMessage看起來是這樣的:

public void routeMessage(final Exchange exchange, final String replyToURI) { 
    String correlationId = exchange.getIn().getHeader("JMSCorrelationID", String.class); 

    if (correlationId != null) { 
     log.debug("Processing Message Response with JMSCorrelationID [{}]", correlationId); 
     exchange.getIn().setHeader("JMSReplyTo", replyToURI); 
    } else { 
     // Request Messages have nave NO correlationId 
     log.debug("Processing Message Request with MessageID [{}] and JMSMessageID: [{}]", 
       exchange.getIn().getMessageId(), 
       exchange.getIn().getHeader("JMSMessageID") != null ? exchange.getIn().getHeader("JMSMessageID").toString() : exchange.getIn().getMessageId()); 
     String message = exchange.getIn().getBody(String.class); 
     Set<ContentBasedRoute> validRoutes = contentBasedRouting 
       .stream().filter(
         routeEntity -> Pattern.compile(
           routeEntity.getRegularExpression(), DOTALL).matcher(message).matches()).collect(Collectors.toSet()); 

     if (validRoutes.isEmpty()) { 
      log.warn("No valid routes found for message: [{}] ", message); 
      exchange.getIn().setHeader("route-valid", false); 

     } else { 
      HashMap<String, ContentBasedRoute> uniqueRoutes = new HashMap<>(); 
      validRoutes.stream().forEach(route -> uniqueRoutes.putIfAbsent(route.getDestination(), route)); 

      exchange.getIn().setHeader("route-valid", true); 
      exchange.getIn().setHeader("route-count", uniqueRoutes.size()); 
      exchange.getIn().setHeader("JMSReplyTo", replyToURI); 
      //if (exchange.getIn().getHeader("JMSMessageID") == null) { 
      // exchange.getIn().setHeader("JMSMessageID", exchange.getIn().getMessageId()); 
      //} 
      if (uniqueRoutes.size() > 1) { 
       log.debug("Building a split route"); 
       StringBuilder routes = new StringBuilder(); 
       StringBuilder routeIds = new StringBuilder(); 
       StringBuilder routeRegex = new StringBuilder(); 
       uniqueRoutes.keySet().stream().forEach(i -> routes.append(i).append(",")); 
       uniqueRoutes.values().stream().forEach(j -> routeIds.append(j.getRouteId()).append(",")); 
       uniqueRoutes.values().stream().forEach(k -> routeRegex.append(k.getRegularExpression()).append(",")); 
       routes.deleteCharAt(routes.length() - 1); 
       routeIds.deleteCharAt(routeIds.length() - 1); 
       routeRegex.deleteCharAt(routeRegex.length() - 1); 

       exchange.getIn().setHeader("route-split", true); 
       exchange.getIn().setHeader("route-uuid", routeIds.toString()); 
       exchange.getIn().setHeader("route-regex", routeRegex.toString()); 
       exchange.getIn().setHeader("route-recipients", routes.toString()); 
      } else { 
       exchange.getIn().setHeader("route-split", false); 
       exchange.getIn().setHeader("route-uuid", uniqueRoutes.values().iterator().next().getRouteId()); 
       exchange.getIn().setHeader("route-regex", uniqueRoutes.values().iterator().next().getRegularExpression()); 
       exchange.getIn().setHeader("route-recipients", uniqueRoutes.values().iterator().next().getDestination()); 
      } 
     } 
    } 
} 

豆messageCoalitionService只是保存郵件正文和標題這樣的消息可以被複制併爲系統的審計。

我不知道如果我錯誤地解決了這個問題,我應該使用駝峯異步API還是需要管道來實現這個?這種模式看起來接近我需要http://camel.apache.org/async.html(異步請求答覆)任何幫助將是非常感謝。

回答