2017-03-04 76 views
0

我創造流如 -如何註冊springxd源MQTT與彈簧XD到MQTT經紀人

stream create mqtttestfile --definition "mqtt --url='tcp://localhost:1883' --topics='helloTopic' | file" --deploy 

創建和部署新的數據流「mqtttestfile」。我還檢查了localhost:9393/admin-ui,流被成功創建和部署。

我的MQTT代理正在localhost:1883上運行。 但是當我檢查/ tmp/xd /輸出文件目錄時,mqtttestfile.out文件丟失。

我需要在以下我的假設點澄清: -

  1. 我想MQTT客戶端在彈簧XD源MQTT模塊已經配置。所以當我們創建流時,它會自動訂閱經紀商的特定主題。

  2. 我也嘗試運行兩個單獨的python腳本,一個用於訂閱,另一個用於單獨終端上的發佈者,它工作正常。所以mqtt經紀人沒有問題。

這是由彈簧XD控制檯日誌我得到:

istener - 15000毫秒

調度部署到新的容器(S)

2017-03-04T12:07:51 + 0530 1.3.0.RELEASE INFO DeploymentsPathChildrenCache-0 container.DeploymentListener - 路徑緩存事件:path =/deployments/modules/allocated/d634d310-12b4-4a83-baea-c1c98dfb7bba/mqtttestfile.sink.file.1,type = CHILD_ADDED

2017-03-04T12:07:51 + 0530 1.3.0.RELEASE INFO DeploymentsPathChildrenCache-0 c ontainer.DeploymentListener - 爲流'mqtttestfile'部署模塊'文件' 2017-03-04T12:07:52 + 0530 1.3.0.RELEASE INFO DeploymentsPathChildrenCache-0 container.DeploymentListener - 部署模塊[ModuleDescriptor @ 7e658391 moduleName ='file' ,moduleLabel ='file',group ='mqtttestfile',sourceChannelName = [null],sinkChannelName = [null],index = 1,type = sink,parameters = map [[empty]],children = list [[empty]] ]

2017-03-04T12:07:54 + 0530 1.3.0.RELEASE INFO DeploymentsPathChildrenCache-0 container.DeploymentListener - 路徑緩存事件:path =/deployments/modules/allocated/d634d310-12b4-4a83-baea- c1c98dfb7bba/mqtttestfile.source.mqtt.1,type = CHILD_ADDED

2017-03-04T12:07:54 + 0530 1.3.0.RELEASE INFO部署sPathChildrenCache-0 container.DeploymentListener - 爲流'mqtttestfile'部署模塊'mqtt' 2017-03-04T12:07:54 + 0530 1.3.0.RELEASE INFO DeploymentsPathChildrenCache-0 container.DeploymentListener - 部署模塊[ModuleDescriptor @ 5a7d8e37 moduleName = 'mqtt',moduleLabel ='mqtt',group ='mqtttestfile',sourceChannelName = [null],sinkChannelName = [null],index = 0,type = source,parameters = map ['topics' - >'helloTopic',' url' - >'tcp:// localhost:1883'],children = list [[empty]]]

2017-03-04T12:07:56 + 0530 1.3.0.RELEASE INFO DeploymentSupervisor-0 zk。 ZKStreamDeploymentHandler - 流'mqtttestfile'的部署狀態:DeploymentStatus {state = deployed}

with spring-xd 1.3.1仍然存在問題未解決,這是我在日誌中看到的錯誤消息 -

2017-03-05T01:15:06 + 0530 1.3.1.RELEASE INFO LeaderSelector-1 zk。DeploymentSupervisor - 由於線程中斷導致取消領導

2017-03-05T01:15:06 + 0530 1.3.1.RELEASE ERROR MQTT Rec:xd.mqtt.client.id.src inbound.MqttPahoMessageDrivenChannelAdapter - 丟失連接:丟失連接;正在重試...

謝謝。

回答

0

我只是測試你的流,它爲我工作得很好......

$ cat /tmp/xd/output/mqtttestfile.out 
foo 
bar 

(後我加入的消息Foo和Bar隊列)。

隨着org.springframework.integration啓用(在容器的logback.grooy文件)的調試日誌,我看到...

2017-03-04T08:02:59-0500 1.3.1.RELEASE INFO DeploymentsPathChildrenCache-0 endpoint.EventDrivenConsumer - started outbound.mqtttestfile.0 
2017-03-04T08:02:59-0500 1.3.1.RELEASE DEBUG DeploymentsPathChildrenCache-0 inbound.MqttPahoMessageDrivenChannelAdapter - Connected and subscribed to [helloTopic] 
2017-03-04T08:02:59-0500 1.3.1.RELEASE INFO DeploymentsPathChildrenCache-0 inbound.MqttPahoMessageDrivenChannelAdapter - started mqttInbound 
2017-03-04T08:02:59-0500 1.3.1.RELEASE INFO DeploymentSupervisor-0 zk.ZKStreamDeploymentHandler - Deployment status for stream 'mqtttestfile': DeploymentStatus{state=deployed} 

奇怪的是,你是不是看到...started...消息(INFO)。

你可以試用版本1.3.1嗎?

+0

目前我使用的是spring-xd 1.3.0,我會試着用1.3.1並讓你更新。謝謝。 – andy

+0

我仍然收到相同的錯誤。我做錯了什麼請檢查以下錯誤日誌msg- 2017-03-05T01:15:06 + 0530 1.3.1.RELEASE ERROR MQTT Rec:xd.mqtt.client.id.src inbound.MqttPahoMessageDrivenChannelAdapter - 丟失的連接:連接丟失;正在重試... 2017-03-05T01:15:06 + 0530 1.3.1.RELEASE INFO LeaderSelector-1 zk.DeploymentSupervisor - 由於線程中斷導致取消 – andy

+0

我很難說;正如我所說,你的流定義對我來說工作得很好;我只是在1.3.0上測試過它,結果相同;沒有錯誤;數據顯示在文件中。它看起來像你有一些網絡/連接錯誤(動物園管理員也如此),但如果你連接到本地主機,這很奇怪。我建議你打開DEBUG日誌記錄(對於所有內容),看看你是否能夠弄清楚發生了什麼。 –