2017-03-29 176 views
0

我使用mqtt協議將傳感器數據的消息發送給蚊子代理。我想要做的是每隔t秒發送一次傳感器數據,但是如果我收到消息並行處理它。是使用time.sleep(),但我認爲這會延遲「on_message」函數。我使用paho-mqtt和python 2.7。關於如何完成這樣的事情的任何想法?在等待消息時發送消息

客戶端1號(發送傳感器數據)

from mysignals import mysignals 
import paho.mqtt.client as mqtt 
import time 

def on_connect(client, userdata, rc): 
    mqttc.subscribe(topic='/+/mysignals/status', qos=0) 
    mqttc.subscribe(topic='/+/mysignals/add_sensor',qos=0) 
    mqttc.subscribe(topic='/+/mysignals/remove_sensor',qos=0) 

def on_message(client,userdata,message): 
    print 'received data' 
    base_topic = '/mysignals' 
    member_id = message.topic.split('/')[1] 
    status_topic = '/mysignals/status' 
    add_sensor_topic = '/mysignals/add_sensor' 
    remove_sensor_topic = '/mysignals/remove_sensor' 
    log_topic = '/log' 
    if status_topic in message.topic: 
     action = mysignals_test.change_status(int(member_id),int(message.payload)) 
     mqttc.publish(topic='/'+member_id+status_topic+log_topic+'/',payload=action,qos=0) 
    elif add_sensor_topic in message.topic: 
     action = mysignals_test.add_sensor(message.playload,int(member_id)) 
     mqttc.publish(topic='/'+member_id+add_sensor_topic+log_topic+'/',payload=action,qos=0) 
    elif remove_sensor_topic in message.topic: 
     action = mysignals_test.remove_sensor(message.payload,int(member_id)) 
     mqttc.publish(topic='/'+member_id+remove_sensor_topic+log_topic+'/',payload=action,qos=0) 
    else: 
     mqttc.publish(topic='/'+member_id+base_topic+log_topic+'/',payload='Wrong Action.',qos=0) 

mysignals_test = mysignals(email='blablabla',password='blabla') 
mysignals_test.add_sensor('temp',150) 
mysignals_test.change_status(150,1) 
mqttc = mqtt.Client(client_id='mysignals') 
mqttc.on_connect = on_connect 
mqttc.on_message = on_message 
mqttc.connect('broker ip') 
mqttc.loop_start() 

while True: 
    for member in mysignals_test.members: 
     if member.status == 1: 
      live_data = mysignals_test.live(member.member_id) 
      for data in live_data: 
       topic = '/'+str(data.member_id)+'/mysignals/'+str(data.sensor_id)+'/' 
       qos = 0 
       retain = False 
       if 'raw' in data.sensor_id: 
        payload = data.values 
       else: 
        payload = data.value 
       mqttc.publish(topic=topic,payload=payload,qos=qos,retain=retain) 
    print 'sent data' 
    time.sleep(55) 

客戶NO2(從客戶端接收數據1.Only subscriber.Also發送測試消息client1.only訂戶太。)

import paho.mqtt.client as mqtt 

def on_connect(client, userdata, rc): 
    mqttc.subscribe(topic='/+/mysignals/+/', qos=0) 
    mqttc.subscribe(topic='/+/mysignals/status/+/', qos=0) 
def on_disconnect(client,userdata,rc): 
    pass 
def on_message(client, userdata, message): 
    if str(message.payload) == '': 
     print 'empty message' 
    else: 
     print 'Received message :' + str(message.payload) + ', on topic: '+ message.topic + ', with QoS: ' + str(message.qos) 
     mqttc.publish(topic='/154/mysignals/status',payload=0,qos=0) 

mqttc = mqtt.Client(client_id='P1') 
mqttc.on_connect = on_connect 
mqttc.on_disconnect = on_disconnect 
mqttc.on_message = on_message 
mqttc.connect('broker ip') 
mqttc.loop_start() 

只是爲了記錄「mysignals」對象是由我製作的,而不是存在於其中的東西。上面的代碼的問題是,當客戶端2接收到傳感器數據時,它會掛起並將測試消息無縫地發回給代理。客戶端2在接收到sens時發送的測試消息或者數據應該被客戶端1讀取,並且客戶端1應該相應地操作「mysignals」對象。

客戶端2輸出:

Received message :25.4, on topic: /150/mysignals/temp/, with QoS: 0 
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0 
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0 
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0 
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0 
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0 
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0 
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0 
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0 
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0 
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0 
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0 
Received message :Success, on topic: /154/mysignals/status/log/, with QoS: 0 
Received message :Success, on topic: /154/mysignals/status/log/, with 

客戶端1個輸出:

Login Successfull. 
sent data 
sent data 
received data 
received data 
received data 
received data 
received data 
received data 
received data 
received data 

PS:Im不包括 「mysignals.py」,因爲是大約200行。

+3

編輯問題向我們展示您已經編寫的代碼,我們將幫助您改進它。 – hardillb

+0

在這種情況下,通常會使用多線程。一個線程發送消息。另一個接收它們。 – DyZ

+0

我添加了我已經編寫的代碼和輸出。 –

回答

0

好吧我設法得到了預期的結果。我必須在狀態子主題上創建2個子主題。一個客戶端應該在日誌子主題上寫入suptopic狀態,一個客戶端應該寫入狀態子主題的值子主題。我還刪除了訂閱上的qos參數。我還添加了time.sleep(),我很少會錯過任何消息。只有在非常短的時間間隔內接收到2條消息時。最大的問題是我不知道主題和子主題實際上工作。

1

我對Paho MQTT庫中回調函數的使用方式的理解是後臺循環(由loop_start啓動)會中斷主線程,所以我不擔心使用time.sleep()的延遲。所以如果你的主要擔心是不拖延on_message回調,它應該不成問題。我經常在使用MQTT回調的Python腳本中使用sleep。

當然回調可能會稍微延遲傳感器發送數據,但是您是否確實需要以絕對精確的時間發送數據?大多數數據庫(例如RRD)都可以輕鬆適應稍微更新的時間。或者,如果您的on_message回調需要很長時間來處理,請考慮將MQTT消息的有效內容從回調函數中傳出並在腳本的其他地方處理。

如果您確實需要傳感器更新的分秒精度,請考慮將函數分成兩個腳本(或線程),每個腳本(或線程)僅用於一個目的(發送或接收)。