2016-03-05 38 views

回答

2

我很高興你已經找到了解決方案。

我創建了一個示例應用程序,它使用Java DSL從標準輸入讀取,發送到MQTT,接收和記錄。

下面是相關的位:

// publisher 

@Bean 
public IntegrationFlow mqttOutFlow() { 
    return IntegrationFlows.from(CharacterStreamReadingMessageSource.stdin(), 
        e -> e.poller(Pollers.fixedDelay(1000))) 
      .transform(p -> p + " sent to MQTT") 
      .handle(mqttOutbound()) 
      .get(); 
} 

@Bean 
public MessageHandler mqttOutbound() { 
    MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("siSamplePublisher", mqttClientFactory()); 
    messageHandler.setAsync(true); 
    messageHandler.setDefaultTopic("siSampleTopic"); 
    return messageHandler; 
} 

// consumer 

@Bean 
public IntegrationFlow mqttInFlow() { 
    return IntegrationFlows.from(mqttInbound()) 
      .transform(p -> p + ", received from MQTT") 
      .handle(logger()) 
      .get(); 
} 

private LoggingHandler logger() { 
    LoggingHandler loggingHandler = new LoggingHandler("INFO"); 
    loggingHandler.setLoggerName("siSample"); 
    return loggingHandler; 
} 

@Bean 
public MessageProducerSupport mqttInbound() { 
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("siSampleConsumer", 
      mqttClientFactory(), "siSampleTopic"); 
    adapter.setCompletionTimeout(5000); 
    adapter.setConverter(new DefaultPahoMessageConverter()); 
    adapter.setQos(1); 
    return adapter; 
} 

foo 
14:40:56.770 [MQTT Call: siSampleConsumer] INFO siSample - foo sent to MQTT, received from MQTT 

編輯

官方Spring集成MQTT樣品與註解& DSL配置位於:https://github.com/spring-projects/spring-integration-samples/tree/master/basic/mqtt

+0

在加里的回答中看到我的編輯。 –

2

解決通過使用Spring引導

@Configuration 
@ComponentScan 
@EnableAutoConfiguration 
@IntegrationComponentScan 
public class Application { 

    public static void main(String[] args) { 
    SpringApplication.run(Application.class, args); 
    } 

    @Bean 
    public MessageChannel mqttInputChannel() { 
    return new DirectChannel(); 
    } 

    @Bean 
    public MqttPahoClientFactory mqttClientFactory() { 
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); 
    factory.setServerURIs("tcp://url:10423"); 
    factory.setUserName("username"); 
    factory.setPassword("password"); 
    return factory; 
    } 

    @Bean 
    public MessageProducer inbound() { 
    MqttPahoMessageDrivenChannelAdapter adapter = 
      new MqttPahoMessageDrivenChannelAdapter("testMqtt", mqttClientFactory(), 
        "test"); 
    adapter.setCompletionTimeout(5000); 
    adapter.setConverter(new DefaultPahoMessageConverter()); 
    adapter.setQos(1); 
    adapter.setOutputChannel(mqttInputChannel()); 
    return adapter; 
    } 

    @Bean 
    @ServiceActivator(inputChannel = "mqttInputChannel") 
    public MessageHandler handler() { 
    return new MessageHandler() { 

     @Override 
     public void handleMessage(Message<?> message) throws MessagingException { 
     System.out.println("!!!!!!!!!!!!!!!!!!!" + message.getPayload()); 
     } 

    }; 
    } 

    @Bean 
    @ServiceActivator(inputChannel = "mqttOutboundChannel") 
    public MessageHandler mqttOutbound() { 
    MqttPahoMessageHandler messageHandler = 
      new MqttPahoMessageHandler("testClient", mqttClientFactory()); 
    messageHandler.setAsync(true); 
    messageHandler.setDefaultTopic("test"); 
    return messageHandler; 
    } 

    @Bean 
    public MessageChannel mqttOutboundChannel() { 
    return new DirectChannel(); 
    } 

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") 
    public interface MyGateway { 

    void sendToMqtt(String data); 

    } 

}