2016-04-13 49 views
1

我正在嘗試安裝一個彈簧集成的小彈簧引導應用程序。它所需要做的就是從jms隊列中取出一條消息,將請求解組到一個對象並路由到一個特定的bean以保持。我測試了路由部分,我可以確認它的工作。與彈簧一體化掙扎dsl

我在我的測試中有一個嵌入式activemq代理,我可以通過彈簧JmsTemplate發送消息,但它似乎沒有解組xml有效負載並路由消息。我可以在日誌中看到這一點:

16:42:09.285 [main] INFO c.m.z.v.o.VitelAsyncPersisterApplicationTests - Sending message 
16:42:09.289 [main] DEBUG o.s.j.c.CachingConnectionFactory - Registering cached JMS Session for mode 1: ActiveMQSession {id=ID:theblacklodge-59640-1460558526948-4:1:2,started=false} [email protected] 
16:42:09.289 [main] DEBUG o.s.j.c.JmsTemplate - Executing callback on JMS Session: Cached JMS Session: ActiveMQSession {id=ID:theblacklodge-59640-1460558526948-4:1:2,started=false} [email protected] 
16:42:09.295 [ActiveMQ Transport: tcp:///127.0.0.1:[email protected]] DEBUG o.a.a.b.TransportConnector - Publishing: tcp://localhost:61616 for broker transport URI: tcp://localhost:61616 
16:42:09.295 [ActiveMQ Transport: tcp:///127.0.0.1:[email protected]] DEBUG o.a.a.b.TransportConnector - Publishing: tcp://localhost:61616 for broker transport URI: tcp://localhost:61616 
16:42:09.295 [ActiveMQ Transport: tcp:///127.0.0.1:[email protected]] DEBUG o.a.a.b.r.AbstractRegion - test_broker adding destination: topic://ActiveMQ.Advisory.Producer.Queue.jms/test 
16:42:09.298 [main] DEBUG o.s.j.c.CachingConnectionFactory - Registering cached JMS MessageProducer for destination [queue://jms/test]: ActiveMQMessageProducer { value=ID:theblacklodge-59640-1460558526948-4:1:2:1 } 
16:42:09.301 [main] DEBUG o.s.j.c.JmsTemplate - Sending created message: ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = <?xml version="1.0" encoding="UTF-8" standalo...uteLogEvent>} 
16:42:09.305 [ActiveMQ Transport: tcp:///127.0.0.1:[email protected]] DEBUG o.a.a.b.r.Queue - test_broker Message ID:theblacklodge-59640-1460558526948-4:1:2:1:1 sent to queue://jms/test 
16:42:09.306 [ActiveMQ BrokerService[test_broker] Task-2] DEBUG o.a.a.b.r.Queue - queue://jms/test, subscriptions=1, memory=0%, size=1, pending=0 toPageIn: 1, Inflight: 0, pagedInMessages.size 0, pagedInPendingDispatch.size 0, enqueueCount: 1, dequeueCount: 0, memUsage:1332 
16:42:09.314 [main] INFO c.m.z.v.o.VitelAsyncPersisterApplicationTests - Message sent 

Spring集成日誌:

13:43:48.996 [main] INFO o.s.i.j.JmsMessageDrivenEndpoint - started [email protected]04469 
13:43:48.996 [main] INFO o.s.i.d.j.JmsInboundGateway - started org.springframework.integration.dsl.jms.JmsInboundGateway#0 
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.dsl.jms.JmsInboundGateway#0' 
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean] 
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - Adding {xml:unmarshalling-transformer} as a subscriber to the 'buildReceiverFlow.channel#0' channel 
13:43:48.996 [main] INFO o.s.i.c.DirectChannel - Channel 'application:test:-1.buildReceiverFlow.channel#0' has 1 subscriber(s). 
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#0 
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0' 
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#1' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean] 
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - Adding {router} as a subscriber to the 'msg.router' channel 
13:43:48.996 [main] INFO o.s.i.c.DirectChannel - Channel 'application:test:-1.msg.router' has 1 subscriber(s). 
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#1 
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#1' 
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#2' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean] 
13:43:48.996 [main] INFO o.s.i.c.DirectChannel - Channel 'application:test:-1.buildRouterFlow.subFlow#0.channel#0' has 1 subscriber(s). 
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#2 
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#2' 
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#3' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean] 
13:43:48.996 [main] INFO o.s.i.c.DirectChannel - Channel 'application:test:-1.buildRouterFlow.subFlow#1.channel#0' has 1 subscriber(s). 
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#3 
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#3' 
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#4' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean] 
13:43:48.996 [main] INFO o.s.i.c.DirectChannel - Channel 'application:test:-1.buildRouterFlow.subFlow#2.channel#0' has 1 subscriber(s). 
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#4 
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#4' 
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#5' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean] 
13:43:48.996 [main] INFO o.s.i.c.DirectChannel - Channel 'application:test:-1.buildRouterFlow.subFlow#3.channel#0' has 1 subscriber(s). 
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#5 
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#5' 
13:43:48.996 [main] INFO o.s.c.s.DefaultLifecycleProcessor - Starting beans in phase 2147483647 
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.jms.config.internalJmsListenerEndpointRegistry' of type [class org.springframework.jms.config.JmsListenerEndpointRegistry] 
13:43:48.997 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.jms.config.internalJmsListenerEndpointRegistry' 
13:43:49.002 [main] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'org.springframework.boot.context.properties.ConfigurationPropertiesBindingPostProcessor' 
13:43:49.002 [main] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'org.springframework.context.annotation.internalScheduledAnnotationProcessor' 
13:43:49.002 [main] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'org.springframework.integration.config.IdGeneratorConfigurer#0' 
13:43:49.017 [main] DEBUG o.s.b.a.l.AutoConfigurationReportLoggingInitializer - 

我不知道我已經離開了或配置錯誤:

測試用例:

@Test 
public void jmsIntegrationTest() { 
    RouteLogEvent log = new RouteLogEvent(); 
    log.setAgentId(8888); 
    log.setInteracitonId(95634); 
    log.setMax(5); 
    log.setQueueTime(1256L); 
    log.setRouteTime(96541L); 
    log.setScore(8); 

    ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(500); 

    marshaller.marshal(log, new StreamResult(bytesOut)); 

    final String xmlPayload = new String(bytesOut.toByteArray()); 

    LOG.info("Sending message"); 

    jmsTemplate.send(jmsQueue, (s) -> { 
     return s.createTextMessage(xmlPayload); 
    }); 

    LOG.info("Message sent"); 

    List<RouteLogEvent> events = testDao.findAllRouteLogs(); 
    assertNotNull(events); 
    assertFalse(events.isEmpty()); 

    List<RouteLogEvent> filtered = events.stream().filter(val -> val.getAgentId() == 8888).collect(Collectors.toList()); 
    assertNotNull(filtered); 
    assertFalse(filtered.isEmpty()); 
} 

彈簧集成配置:

@SpringBootApplication 
@EnableIntegration 
public class VitelAsyncPersisterApplication { 

    private static final Map<Class, String> ROUTING_EVENTS = new HashMap<>(); 

    private static final String CHANNEL_RECORDING = "channel-recording"; 
    private static final String CHANNEL_INTERACTION_STATE = "channel-interaction-state"; 
    private static final String CHANNEL_AGENT_STATE = "channel-agent-state"; 
    private static final String CHANNEL_ROUTE_LOG = "channel-route"; 

    static { 
     ROUTING_EVENTS.put(AgentStateChangeEvent.class, CHANNEL_AGENT_STATE); 
     ROUTING_EVENTS.put(InteractionStateChangeEvent.class, CHANNEL_INTERACTION_STATE); 
     ROUTING_EVENTS.put(Recording.class, CHANNEL_RECORDING); 
     ROUTING_EVENTS.put(RouteLogEvent.class, CHANNEL_ROUTE_LOG); 
    } 

    @Value("${jms.queue.entity.persist}") 
    private String jmsQueueName; 

    @Value("${jms.broker.url}") 
    private String jmsBrokerUrl; 

    @Autowired 
    private EventDao eventDao; 

    @Bean 
    public Jaxb2Marshaller xmlMarshaller() { 
     Jaxb2Marshaller marshaller = new Jaxb2Marshaller(); 
     marshaller.setSchema(new ClassPathResource("entities.xsd")); 
     marshaller.setPackagesToScan("com.mhgad.za.vitel.persister.entities"); 

     return marshaller; 
    } 

    @Bean 
    public ConnectionFactory jmsConnFactory() { 
     ActiveMQConnectionFactory activeMq = new ActiveMQConnectionFactory(jmsBrokerUrl); 

     CachingConnectionFactory cachingConnFactory = new CachingConnectionFactory(); 
     cachingConnFactory.setTargetConnectionFactory(activeMq); 

     return cachingConnFactory; 
    } 

    @Bean 
    public IntegrationFlow buildReceiverFlow(ConnectionFactory jmsConnectionFactory, Jaxb2Marshaller marshaller) { 
     UnmarshallingTransformer xmlToObjTransformer = Transformers.unmarshaller(marshaller); 

     JmsInboundGatewaySpec jmsSpec = Jms.inboundGateway(jmsConnectionFactory).destination(jmsQueueName); 

     return IntegrationFlows.from(jmsSpec).transform(xmlToObjTransformer).channel("msg.router").get(); 
    } 

    @Bean 
    public IntegrationFlow buildRouterFlow() { 

     Function router = (p) -> { 
      if (ROUTING_EVENTS.containsKey(p.getClass())) { 
       return ROUTING_EVENTS.get(p.getClass()); 
      } else { 
       return null; 
      } 
     }; 

     return IntegrationFlows.from("msg.router").route(router, m -> m 
       .subFlowMapping(CHANNEL_AGENT_STATE, sf -> sf.handle((p) -> eventDao.save((AgentStateChangeEvent) p.getPayload()))) 
       .subFlowMapping(CHANNEL_INTERACTION_STATE, sf -> sf.handle((p) -> eventDao.save((InteractionStateChangeEvent) p.getPayload()))) 
       .subFlowMapping(CHANNEL_RECORDING, sf -> sf.handle((p) -> eventDao.save((Recording) p.getPayload()))) 
       .subFlowMapping(CHANNEL_ROUTE_LOG, sf -> sf.handle((p) -> eventDao.save((RouteLogEvent) p.getPayload())))).get(); 
    } 

    public static void main(String[] args) { 
     SpringApplication.run(VitelAsyncPersisterApplication.class, args); 
    } 
} 

回答

1

根據你的日誌,我真的沒有看到任何Spring Integration基礎設施。

那麼,也許你剛剛錯過了@EnableIntegration那裏?

再加上你的測試有點奇怪。它將消息發送給JMS並檢查數據庫的結果。但是我們並沒有在那裏看到你如何開始整合的配置。

由於您要僅收聽消息並將它們存儲在數據庫中,因此請考慮使用單向 JMS組件 - Jms.messageDriverChannelAdapter()。代替請求/回覆網關。

+0

嗨Artem,我修改了配置片段。它確實有@EnableIntegration。是的,我認爲請求/答覆是不正確的,但在Poller需要放置的地方掙扎着。 WRT日誌我添加了一些額外的輸出 – user3465651

+0

更改爲Jms.messageDriverChannelAdapter(),並將activemq的版本從5.13.2更正爲5.12.3,因爲我在本文中看到類似的問題 - > http:// stackoverflow .COM /問題/ 36007782 /彈簧配置 - 嵌入式的BrokerService – user3465651