我試圖通過設置輪詢器以定期輪詢來自隊列的(JSON)消息並處理消息並保存到我的數據庫來實現Spring IntegrationFlow與AWS SQS隊列一起工作。用於多個JSON對象模式的Spring IntegrationFlow
我成功地從隊列中輪詢單個JSON消息模式並轉換爲我的自定義對象。現在我有兩種JSON模式發送到同一個SQS隊列。例如,
"Type" : "Notification", "MessageId" : "xxxx-xxxx-xxxx", "TopicArn" : "arn:aws:sns:us-west-2:xxxxx:topicName00", "Subject" : "OK: \"test00\" in US-West-2", "Message" : "{\"AlarmName\":\"test00\..."
和
"Type" : "Notification", "MessageId" : "xxxxx-xxxx-xxxxx", "TopicArn" : "arn:aws:sns:us-west-2:xxxxxx:topicName01", "Message" : "{\"version\":\"0\",\"id\":\"xxxxx\",\"detail-type\":\"EC2 Instance State-change Notification\",\"source\":\"aws.ec2\..."
這些消息被髮送到相同的隊列,我想使用相同的輪詢器然後將消息路由到不同的變壓器和serviceActivator輪詢隊列(處理)根據他們的消息正文。
@Bean
public IntegrationFlow sqsIntegrationFlow()
{
return IntegrationFlows.from(this.sqsMessageSource(), c -> c.poller(myPoller()))
.channel(new DirectChannel())
.<Payload,Boolean>route(input -> input.value().contains("EC2 Instance State-change Notification"),
mapping -> mapping
.subFlowMapping("true", sf -> sf.channel(new DirectChannel())
.transform(
SqsMessageToInstanceConverter::convertSqsMessagesToInstanceInfo)
.channel(new DirectChannel()).handle((message) -> {
ec2InstanceService.updateInstanceInfo((List<SqsMessageResult>) message.getPayload());
}))
.subFlowMapping("false", sf -> sf.channel(new DirectChannel())
.transform(SqsMessageToInstanceConverter::convertSqsMessageToAlarmInfo)
.channel(new DirectChannel()).handle((alarm -> {
cwAlarmService.updateAlarmInfo(
(List<SqsAlarmMessageResult>) alarm.getPayload());
}))))
.get();
}
我嘗試使用路由器作爲上述鑑定信息使用郵件正文(「EC2實例狀態變更通知」)的字符串,但得到的錯誤與
java.lang.ClassNotFoundException: org.springframework.integration.support.management.MappingMessageRouterManagement
我的問題是:
1.這是使用路由器的正確方法嗎?
2.我如何才能完成處理2個不同的JSON消息與集成流程的目標?
考慮切換到我們的官方Spring集成AWS擴展:https://github.com/spring-projects/彈簧集成-AWS。我們沒有'SqsMessageSource',但是消息驅動監聽器。雖然我會對你選擇SI-AWS解決方案感興趣。謝謝! –