2016-05-20 65 views
1

我試圖通過設置輪詢器以定期輪詢來自隊列的(JSON)消息並處理消息並保存到我的數據庫來實現Spring IntegrationFlow與AWS SQS隊列一起工作。用於多個JSON對象模式的Spring IntegrationFlow

我成功地從隊列中輪詢單個JSON消息模式並轉換爲我的自定義對象。現在我有兩種JSON模式發送到同一個SQS隊列。例如,

"Type" : "Notification", "MessageId" : "xxxx-xxxx-xxxx", "TopicArn" : "arn:aws:sns:us-west-2:xxxxx:topicName00", "Subject" : "OK: \"test00\" in US-West-2", "Message" : "{\"AlarmName\":\"test00\..."

"Type" : "Notification", "MessageId" : "xxxxx-xxxx-xxxxx", "TopicArn" : "arn:aws:sns:us-west-2:xxxxxx:topicName01", "Message" : "{\"version\":\"0\",\"id\":\"xxxxx\",\"detail-type\":\"EC2 Instance State-change Notification\",\"source\":\"aws.ec2\..."

這些消息被髮送到相同的隊列,我想使用相同的輪詢器然後將消息路由到不同的變壓器和serviceActivator輪詢隊列(處理)根據他們的消息正文。

@Bean 
public IntegrationFlow sqsIntegrationFlow() 
{ 
    return IntegrationFlows.from(this.sqsMessageSource(), c -> c.poller(myPoller())) 
      .channel(new DirectChannel()) 
      .<Payload,Boolean>route(input -> input.value().contains("EC2 Instance State-change Notification"), 
        mapping -> mapping 
          .subFlowMapping("true", sf -> sf.channel(new DirectChannel()) 
            .transform(
              SqsMessageToInstanceConverter::convertSqsMessagesToInstanceInfo) 
            .channel(new DirectChannel()).handle((message) -> { 
             ec2InstanceService.updateInstanceInfo((List<SqsMessageResult>) message.getPayload()); 
            })) 
          .subFlowMapping("false", sf -> sf.channel(new DirectChannel()) 
            .transform(SqsMessageToInstanceConverter::convertSqsMessageToAlarmInfo) 
            .channel(new DirectChannel()).handle((alarm -> { 
             cwAlarmService.updateAlarmInfo(
               (List<SqsAlarmMessageResult>) alarm.getPayload()); 
            })))) 
      .get(); 
} 

我嘗試使用路由器作爲上述鑑定信息使用郵件正文(「EC2實例狀態變更通知」)的字符串,但得到的錯誤與

java.lang.ClassNotFoundException: org.springframework.integration.support.management.MappingMessageRouterManagement

我的問題是:

1.這是使用路由器的正確方法嗎?

2.我如何才能完成處理2個不同的JSON消息與集成流程的目標?

+1

考慮切換到我們的官方Spring集成AWS擴展:https://github.com/spring-projects/彈簧集成-AWS。我們沒有'SqsMessageSource',但是消息驅動監聽器。雖然我會對你選擇SI-AWS解決方案感興趣。謝謝! –

回答

1

是的,這是正確的(我在一個小時前寫了類似的流程)。看起來你有一些類路徑問題 - 該接口和路由器在同一個jar裏。你在哪裏運行你的應用程序?

嘗試使用-verbose JVM運行ARG,我只是做了,得到了這個...

... 
[Loaded org.springframework.integration.router.AbstractMessageRouter from file:/Users/.../.m2/repository/org/springframework/integration/spring-integration-core/4.2.5.RELEASE/spring-integration-core-4.2.5.RELEASE.jar] 
[Loaded org.springframework.integration.support.management.MappingMessageRouterManagement from file:/Users/.../.m2/repository/org/springframework/integration/spring-integration-core/4.2.5.RELEASE/spring-integration-core-4.2.5.RELEASE.jar] 
... 
+0

謝謝你,加里。似乎我錯過了包含依賴關係。在我的pom文件中添加了最新版本的spring-integration-core artifact後,java.lang.ClassNotFoundException消失。 – user20160519