2017-07-20 89 views
-1

我試圖讓mqtt客戶端收到來自它已訂閱的主題的所有郵件,但它每次收到郵件時都只收到第一個郵件另一個客戶端。問題在於,客戶端應該使用qos 2處理10條消息,而只處理第一條消息。消息以幾毫秒的時間間隔同時發送。我不是經常發送消息。我每分鐘發送10條消息。兩個客戶端都是持久的。我確信這封郵件離開了發佈者,因爲無論何時發送郵件,我都會打印它的有效載荷。我使用的是qos 2,因爲收到的消息會保存到數據庫中,我不想重複。我使用的代理是activemq。所以問題是爲什麼會發生這種情況?Mqtt客戶端無法同時處理多個郵件

from sqlalchemy.ext.automap import automap_base 
from sqlalchemy.orm import Session 
from sqlalchemy import create_engine 
from sqlalchemy import update 
from sqlalchemy.ext.automap import generate_relationship 
import sqlalchemy 
import paho.mqtt.client as mqtt 
import time 
#Function that define what to do on client conenction 
def on_connect(client, userdata, rc): 
    #Subscribe to all specified topics 
    mqttc.subscribe(topic='/+/mysignals/sensors/+/') 
def on_message(client,userdata,message): 
    #Get the mysignals member id from the topic 
    topic_split = message.topic.split('/') 
    member_id = topic_split[1] 
    session = Session(engine) 
    sensor_id = topic_split[4] 
    patient = session.query(Patient).filter(Patient.mysignalsid==member_id).first() 
    if message.payload == None: 
     payload = 0 
    else: 
     payload = message.payload 
    if patient: 
     current_time = time.time() 
     if patient.id in pending.keys() and (current_time - pending[patient.id]['time_created']) <= 55: 
      pending[patient.id]['record'].__dict__[sensor_id] = payload 
      print time.time() 
     else: 
      pending.pop(patient.id,None) 
      patientdata = PatientData() 
      patientdata.__dict__[sensor_id] = payload 
      print patientdata.__dict__[sensor_id] 
      print payload 
      print patientdata.temp 
      patient.patientdata_collection.append(patientdata) 
      session.add(patientdata) 
      print time.time() 
      pending.update({patient.id:{ 
            'time_created':time.time(), 
            'record':patientdata, 
            }}) 
     session.flush() 
     session.commit() 
     print('Wrote to database.') 

pending = {} 
Base = automap_base() 
engine = create_engine('mysql+mysqlconnector://user:[email protected]/db') 
# reflect the tables 
Base.prepare(engine, reflect=True) 
Patient = Base.classes.patient 
PatientData = Base.classes.patientdata 
session = Session(engine) 
#Create a mqtt client object 
mqttc = mqtt.Client(client_id='database_logger',clean_session=False) 
#Set mqtt client callbacks 
mqttc.on_connect = on_connect 
mqttc.on_message = on_message 
#Set mqtt broker username and password 
mqttc.username_pw_set('blah','blahblah') 
#Connect to the mqtt broker with the specified hostname/ip adress 
mqttc.connect('127.0.0.1') 
mqttc.loop_forever() 

輸出繼電器:

98 
98 
None 
1500576377.3 
Wrote to database. 
1500576377.43 
Wrote to database. 

輸出應該是:

98 
98 
None 
1500576377.3 
Wrote to database. 
25.4 
25.4 
25.4 
1500576377.43 
Wrote to database. 
+0

編輯問題顯示您的代碼 – hardillb

+0

我更新了帖子並添加了我的代碼。 –

+0

首先,你的話題不應該以'/'開頭和結尾,其次你的代碼會得到什麼輸出? – hardillb

回答

-1

這不是最終的MQTT客戶端的問題。 代碼錯了,第二條消息沒有寫入數據庫。

爲了得到它的工作我不得不更換以下行:與這一個

pending[patient.id]['record'].__dict__[sensor_id] = payload 

setattr(pending[patient.id]['record'],sensor_id,payload) 

另外刪除行:

session = Session(engine) 

外的on_message函數。

我還加了一行:

session.expunge_all() 

線以下:

session.commit() 

爲了每一個交易數據庫進行一次清理會話。