2017-07-03 253 views
1

我們正在使用Springmessage-driven-channel-adapter訂閱MQTT的話題。但我們經常遇到錯誤。我已經使用JavaScript客戶端(mqttws31.js)測試了連接,它工作正常。意味着沒有連接問題。Spring MqttPahoMessageDrivenChannelAdapter丟失連接:連接丟失;正在重試

錯誤: -

org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter connectionLost 
SEVERE: Lost connection:Connection lost; retrying... 

MQTT消息: -

[payload=6483D03E4C75BA943148F18D73,1.00,1E, headers={mqtt_retained=false, mqtt_qos=0, 
id=5fa41168-34c6-1e3d-a775-e3146842990a, mqtt_topic=TEST/GATEWAY2, mqtt_duplicate=false, timestamp=1499067757559}] 

配置: -

<bean id="clientFactory" 
    class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory"> 
    <property name="userName" value="${mqtt.username}" /> 
    <property name="password" value="${mqtt.password}" /> 
</bean> 

<int-mqtt:message-driven-channel-adapter 
    id="mqttInbound" client-id="${mqtt.default.client.id}" url="${mqtt.url}" 
    topics="${topics}" client-factory="clientFactory" auto-startup="true" 
    channel="output" error-channel="errorChannel" /> 


<int:channel id="output" /> 
<int:channel id="errorChannel" /> 

<int:service-activator input-channel="errorChannel" 
    ref="errorMessageLogger" method="logError" /> 
<bean id="errorMessageLogger" class="com.mqtt.ErrorMessageLogger" /> 

<int:service-activator input-channel="output" 
    method="handleMessage" ref="mqttLogger" /> 
<bean id="mqttLogger" class="com.mqtt.MqttReciever" /> 

的pom.xml:

<dependency> 
    <groupId>org.eclipse.paho</groupId> 
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId> 
    <version>1.1.1</version> 
</dependency> 
<dependency> 
    <groupId>org.springframework.integration</groupId> 
    <artifactId>spring-integration-mqtt</artifactId> 
    <version>4.2.2.RELEASE</version> 
</dependency> 

在調試org.eclipse.paho.client.mqttv3-1.1.1-sources.jar: -

CommsReceiver.Java

public void run() { 
     final String methodName = "run"; 
     MqttToken token = null; 

     while (running && (in != null)) { 
      try { 
       //@TRACE 852=network read message 
       log.fine(CLASS_NAME,methodName,"852"); 
       receiving = in.available() > 0; 
       MqttWireMessage message = in.readMqttWireMessage(); 
       receiving = false; 

       // instanceof checks if message is null 
       if (message instanceof MqttAck) { 
        token = tokenStore.getToken(message); 
        if (token!=null) { 
         synchronized (token) { 
          // Ensure the notify processing is done under a lock on the token 
          // This ensures that the send processing can complete before the 
          // receive processing starts! (request and ack and ack processing 
          // can occur before request processing is complete if not! 
          clientState.notifyReceivedAck((MqttAck)message); 
         } 
        } else if(message instanceof MqttPubRec || message instanceof MqttPubComp || message instanceof MqttPubAck) { 
         //This is an ack for a message we no longer have a ticket for. 
         //This probably means we already received this message and it's being send again 
         //because of timeouts, crashes, disconnects, restarts etc. 
         //It should be safe to ignore these unexpected messages. 
         log.fine(CLASS_NAME, methodName, "857"); 
        } else { 
         // It its an ack and there is no token then something is not right. 
         // An ack should always have a token assoicated with it. 
         throw new MqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR); 
        } 
       } else { 
        if (message != null) { 
         // A new message has arrived 
         clientState.notifyReceivedMsg(message); 
        } 
       } 
      } 
      catch (MqttException ex) { 
       //@TRACE 856=Stopping, MQttException 
       log.fine(CLASS_NAME,methodName,"856",null,ex); 
       running = false; 
       // Token maybe null but that is handled in shutdown 
       clientComms.shutdownConnection(token, ex); 
      } 
      catch (IOException ioe) { 
       //@TRACE 853=Stopping due to IOException 
       log.fine(CLASS_NAME,methodName,"853"); 

       running = false; 
       // An EOFException could be raised if the broker processes the 
       // DISCONNECT and ends the socket before we complete. As such, 
       // only shutdown the connection if we're not already shutting down. 
       if (!clientComms.isDisconnecting()) { 
        clientComms.shutdownConnection(token, new MqttException(MqttException.REASON_CODE_CONNECTION_LOST, ioe)); 
       } 
      } 
      finally { 
       receiving = false; 
      } 
     } 

     //@TRACE 854=< 
     log.fine(CLASS_NAME,methodName,"854"); 
    } 

在上述方法中,有時in.readMqttWireMessage()IOException。所以根據catch塊重新連接使用clientComms.shutdownConnection(token, ...

+1

你的問題不明確。如果連接丟失,則意味着連接出現問題,可能是由於網絡錯誤。適配器將嘗試重新連接。 –

+0

請查找更新後的問題。我已經測試了與JavaScript客戶端的連接,它工作正常。 – HybrisFreelance

回答

1

但是你仍然沒有真正描述一個問題。你在上面顯示一條消息,所以它必須爲你工作。 Paho正在檢測連接問題;它會通知將重新連接的Spring集成。

通過將ApplicationListener添加到您的應用程序中,您可以獲得有關該例外的完整信息。

@Bean 
public ApplicationListener<?> eventListener() { 
    return new ApplicationListener<MqttConnectionFailedEvent>() { 

     @Override 
     public void onApplicationEvent(MqttConnectionFailedEvent event) { 
      event.getCause().printStackTrace(); 
     } 

    }; 
} 

結果:

Connection lost (32109) - java.io.EOFException 
    at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:164) 
    at java.lang.Thread.run(Thread.java:748) 
Caused by: java.io.EOFException 
    at java.io.DataInputStream.readByte(DataInputStream.java:267) 
    at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92) 
    at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:116) 
    ... 1 more 

(當我關閉代理)。

如果您認爲paho客戶端存在問題,那麼您應該爲該項目提出問題。

+0

'但你還沒有真正描述一個問題' - 問題經常是「連接丟失;重試...」,這並不是我期待的。但現在它的工作正常。我已經使用了最新的mqtt(4.3.10.RELEASE)並且做了maven clean/install。現在它按預期工作。沒有連接丟失的錯誤。謝謝你的時間。 – HybrisFreelance