2016-10-14 39 views
2

嗨我試圖使用彈簧集成來接收MQTT消息,處理它們併發布到另一個主題。春季Mqtt集成:出站主題問題

這裏是integration.xml:

<int-mqtt:outbound-channel-adapter id="mqtt-publish" 
    client-id="spring-foo-1" 
    client-factory="clientFactory" 
    auto-startup="true" 
    url="tcp://localhost:1883" 
    default-qos="0" 
    default-retained="true" 
    default-topic="tweets/akki" /> 

    <int-mqtt:message-driven-channel-adapter id="oneTopicAdapter" 
    client-id="spring-foo-2" 
    client-factory="clientFactory" 
    auto-startup="true" 
    url="tcp://localhost:1883" 
    topics="mqtt/publish" 
    /> 

    <int:service-activator input-channel="oneTopicAdapter" method="logMessages" ref="mqttLogger" output-channel="mqtt-publish"></int:service-activator> 

    <bean id="mqttLogger" class="hello.mqttReceiver" /> 

而且mqttReceiver.java:

package hello; 
public class mqttReceiver { 
    public String logMessages(String a){ 
     String processed_data = a; //TODO Process Data 
     return processed_data; 
    } 
} 

以下是問題,我面對:

  • processed_data被送到mqtt/publish and not mqtt/akki
  • The processed_data沒有發佈的信息,但很多時候

回答

3

這是正確的,因爲AbstractMqttMessageHandler需要看首先進入headers

String topic = (String) message.getHeaders().get(MqttHeaders.TOPIC); 
    Object mqttMessage = this.converter.fromMessage(message, Object.class); 
    if (topic == null && this.defaultTopic == null) { 
     throw new MessageHandlingException(message, 
       "No '" + MqttHeaders.TOPIC + "' header and no default topic defined"); 
    } 

DefaultPahoMessageConverter填充從MqttPahoMessageDrivenChannelAdapter上消息到達MqttHeaders.TOPIC頭。

你應該考慮將消息發送到<int-mqtt:outbound-channel-adapter>

+1

您也可以使用頭,濃縮塔更換主題標題前使用<int:header-filter header-names="mqtt_topic"/>(覆蓋設置爲true)。 –