2016-01-31 111 views
0

我正在構建一個從串行端口接收字節的python項目。字節是對發送命令的響應(也是通過串行端口)。答案沒有識別標記,即僅來自字節,我不知道這是哪個命令響應。解碼器當然需要事先知道這是一個響應的命令。將字節流反序列化爲對象

我希望傳入的字節序列表示爲嵌套對象,指示幀,標頭,有效載荷,解碼的有效載荷等。我更喜歡每次向解碼器推送1個字節並讓它調用回調一旦它已經收到足夠的字節爲一個完整的對象(或errorCallback如果有錯誤或超時)。

實際的幀有一個開始字節和一個結束字節。它有一個包含幾個字節(一些id,命令狀態(基本上是ok/fail))的頭部,一個是數據長度字節。緊接着是一個校驗和(單字節)的數據。數據是對命令的響應。

響應是可預測的,因爲前面的字節決定了即將到來的字節的含義。

實施例響應:

AA:00:0C:00:01:00:00:D3:8D:D4:5C:50:01:04:E0:6E:BB

分拆下來:

aa: start frame 
    00: id 
    0c: data length (incl status): 12 bytes 
    00: command status (byte 1) 
     01: 1 data frame (byte 2) 
      00:00: flags of first data frame (byte 3-4) 
      d3:8d:d4:5c:50:01:04:e0: first data (aa and bb could be in it) (byte 5-12) 
    6e: checksum (includes all bytes except start, checksum, end bytes) 
bb: end frame 

這是串行端口通信,字節可能丟失(和額外的生產)和我希望使用超時來處理重置(無響應預期沒有被髮送的第一命令)。

我真的想要一個面向對象的方法,其中的解碼器將產生一個對象,當序列化時,會再次產生相同的字節序列。我使用python 2.7,但真正的任何面向對象的語言都可以(只要我可以將它轉換爲python)。

我只是不知道如何構造解碼器,使它看起來整潔。我正在尋找一個完整的解決方案,只是讓我朝着正確的方向前進(正確的方向有點主觀)。

+0

如果你發佈了一些代碼,你更可能對你的問題得到積極迴應。不要擔心,如果它不整齊,或不使用任何先進的技巧。不僅發佈一些代碼表明你已經做出了一個誠實的嘗試來自己解決你的問題,一塊代碼是一個非常有用的方式來顯示你正在嘗試做什麼。 –

+0

你是說你只是想將傳入的固定長度響應解析爲可以轉換回字節的對象(帶有屬性)? – TisteAndii

+0

我確實有一些代碼,但它很可怕,實際上並沒有正常工作。另外,它並沒有滿足我的願望,使它成爲模塊化的。 TisteAndii下面的答案實際上比我的要好。 – galmok

回答

1

我不完全明白你想要做什麼,但如果你想從一些設備接收固定長度的反應,使他們的一些對象的屬性,將這樣的事情會好起來?:

START_FRAME = 0xAA 
END_FRAME = 0xBB 
TIMEOUT = 2 

class Response: 
def __init__(self, response): 
    if (len(response) - 6) % 11 == 0 and response[0] == START_FRAME and response[-1] == END_FRAME: # verify that its a valid response 
     self.header = {} # build header 
     self.header['id'] = response[1] 
     self.header['length'] = response[2] 
     self.header['status'] = response[3] 
     self.r_checksum = response[-2] # get checksum from response 
     self.checksum = self.header['id']^self.header['length']^self.header['status'] # verify the checksum 
     self.payload = response[4:-2] # get raw payload slice 
     self.decode(self.payload) # parse payload 
     if self.r_checksum == self.checksum: # final check 
      self.parsed = True 
     else: 
      self.parsed = False 
    else: # if response didnt follow the pattern 
     self.parsed = False 
def decode(self, packet): # decode payload 
    self.frm_count = packet[0] # get number of data frames 
    self.checksum ^= self.frm_count 
    self.decoded = [] # hold decoded payload 
    frames = packet[1:] 
    for c in range(self.frm_count): # iterate through data frames 
     flags = frames[(c*10):(c*10 + 2)] 
     for f in flags: 
      self.checksum ^= f 
     data = frames[(c*10 + 2):(c+1)*10] 
     for d in data: 
      self.checksum ^= d 
     self.decoded.append({'frame': c+1, 'flags': flags, 'data':data}) 
def serialize(): # reconstruct response 
    res = bytearray() 
    res.append(START_FRAME) 
    res.extend([self.header['id'], self.header['length'], self.header['status']]) 
    res.extend(self.payload) 
    res.extend([self.checksum, END_FRAME]) 
    return res 

response = bytearray() 
ser = serial.Serial('COM3', 9600) # timeout is 2 seconds 
last_read = time.clock() 
while time.clock() - last_read < TIMEOUT: 
    while ser.inWaiting() > 0: 
     response.append(ser.read()) 
     last_read = time.clock() 
decoded_res = Response(response) 
if decoded_res.parsed: 
    # do stuff 
else: 
    print('Invalid response!') 

該代碼假定可能有多個數據幀,數據幀之前是一個指示數據幀數量的字節。 與串行通信(即使在115200波特)相比,解析數據包的速度更快。整個事情大概是O(n),我想。

+0

您的假設對於顯示的回覆是正確的。根據發送的命令,我得到各種不同的答案,必須按自己的方式解碼。通過添加額外的解碼方法(適用於每個命令;您可以擴展代碼以考慮這一點)。我認爲你的建議對我來說是足夠好的,即使這個課程需要一個完整的框架才能開始解碼。我曾希望採用更多面向流的方法。使用超時將字節流分解爲數據包可能會變得不可靠(而且速度慢)。我希望這不會是一個問題。非常感謝! – galmok

+0

也許你可以通過面向流的方式擴展你的意思?在你理解之前,你不需要一個框架是完整的,因爲框架可能會變得不完整/無效?你可以減少超時時間(如果你的波特率足夠高),甚至可以用time.clock()來代替time.sleep()來測量時間間隔,而不是浪費秒。讓我編輯代碼 – TisteAndii

+0

我的擔心主要是如果響應是背靠背的,即只通過查看超時值(因爲沒有),數據包不能被分割。但是,我認爲每次響應之後我都可以承受一小段延遲,然後向設備發送新的請求。我需要儘可能快地發送儘可能多的請求(並獲得儘可能多的回覆)。 1-2個字符時間的超時應該是實惠的。通過面向流的方式,我想到了一種將無盡的流字節發送給類的方法,並且一旦它有一個完整的數據包,就會調用一個回調函數。更多的字節會導致更多的回調。你的代碼是一個很好的起點。 – galmok

相關問題