在哪裏可以找到示例如何使用MQTT + JAva Config?Spring MQTT JAva Config示例問題
這不是爲我工作:http://docs.spring.io/spring-integration/reference/html/mqtt.html
在哪裏可以找到示例如何使用MQTT + JAva Config?Spring MQTT JAva Config示例問題
這不是爲我工作:http://docs.spring.io/spring-integration/reference/html/mqtt.html
我很高興你已經找到了解決方案。
我創建了一個示例應用程序,它使用Java DSL從標準輸入讀取,發送到MQTT,接收和記錄。
下面是相關的位:
// publisher
@Bean
public IntegrationFlow mqttOutFlow() {
return IntegrationFlows.from(CharacterStreamReadingMessageSource.stdin(),
e -> e.poller(Pollers.fixedDelay(1000)))
.transform(p -> p + " sent to MQTT")
.handle(mqttOutbound())
.get();
}
@Bean
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("siSamplePublisher", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("siSampleTopic");
return messageHandler;
}
// consumer
@Bean
public IntegrationFlow mqttInFlow() {
return IntegrationFlows.from(mqttInbound())
.transform(p -> p + ", received from MQTT")
.handle(logger())
.get();
}
private LoggingHandler logger() {
LoggingHandler loggingHandler = new LoggingHandler("INFO");
loggingHandler.setLoggerName("siSample");
return loggingHandler;
}
@Bean
public MessageProducerSupport mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("siSampleConsumer",
mqttClientFactory(), "siSampleTopic");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
return adapter;
}
。
foo
14:40:56.770 [MQTT Call: siSampleConsumer] INFO siSample - foo sent to MQTT, received from MQTT
編輯
官方Spring集成MQTT樣品與註解& DSL配置位於:https://github.com/spring-projects/spring-integration-samples/tree/master/basic/mqtt
在加里的回答中看到我的編輯。 –
解決通過使用Spring引導
@Configuration
@ComponentScan
@EnableAutoConfiguration
@IntegrationComponentScan
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setServerURIs("tcp://url:10423");
factory.setUserName("username");
factory.setPassword("password");
return factory;
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("testMqtt", mqttClientFactory(),
"test");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("!!!!!!!!!!!!!!!!!!!" + message.getPayload());
}
};
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("testClient", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("test");
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MyGateway {
void sendToMqtt(String data);
}
}
什麼是不工作? –