2017-01-27 188 views
0

我在python中使用UDP進行大量數據包丟失。我知道如果我不想丟包,我應該使用TCP,但是我沒有(完全)控制發送者。用python接收UDP數據包,造成數據包丟失

這是一臺使用UDP多點傳送每秒發送15個圖像的相機。

下面你看到我現在寫的代碼。 它使用多處理來允許生產者和消費者功能並行工作。生產者函數捕獲數據包,消費者函數處理它們並將圖像寫入.bmp文件。

我寫了一個類PacketStream從包字節寫入.bmp文件。

當照相機發送新的圖像時,首先發送一個分組,其中第一字節= 0×01。這包含有關圖像的信息。 然後以第一個字節= 0x02發送612個數據包。這些包含圖像中的字節(508字節/包)。

由於15幅圖像每秒發送,〜9000包每秒被髮送。儘管這種情況發生在每個圖像突發更快的速率下,大約爲22個數據包/毫秒。

我可以得到完全利用的tcpdump或Wireshark的所有數據包。 但使用下面的代碼,數據包被遺漏。 當然,我的Windows 7電腦應該能夠處理這個?我也在樹莓派3上使用它,並且差不多有相同數量的數據包被遺漏。因此我認爲這是代碼的問題。

我已經嘗試了很多不同的東西,比如線程的隊列,而不是管,而不是多。

我也試圖與

sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 3000000) 

增加套接字緩衝區無濟於事。

這是所有可能的蟒蛇?

由於提前,

import time 
from multiprocessing import Process, Queue 
import socket 
import struct 
from PIL import Image 


class PacketStream: 
    def __init__(self, output_path): 
     self.output_path = output_path 
     self.data_buffer = '' 
     self.img_id = -1 # -1 = waiting for start of new image 

    def process(self, data): 
     message_id = data[0] 
     if message_id == '\x01': 
      self.wrap_up_last_image() 
      self.img_id = ord(data[3]) 
      self.data_buffer = '' 
     if message_id == '\x02': 
      self.data_buffer += data[6:] 

    def wrap_up_last_image(self): 
     if self.img_id > 0: 
      n_bytes = len(self.data_buffer) 
      if n_bytes == 307200: 
       global i 
       write_image(self.output_path + str(i).zfill(7) + '_' + str(self.img_id).zfill(3) + '.bmp', 
          self.data_buffer) 
       i += 1 
      else: 
       print 'Image lost: %s bytes missing.' % (307200 - n_bytes) 


def write_image(path, data): 
    im = Image.frombuffer('L', (640, 480), bytearray(data), 'raw', 'L', 0, 1) 
    im.save(path) 
    print time.time(), path 


def producer(q): 
    # setup socket 
    MCAST_GRP = '239.255.83.71' 
    MCAST_PORT = 2271 
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 
    sock.bind(('', MCAST_PORT)) 
    mreq = struct.pack('4sl', socket.inet_aton(MCAST_GRP), socket.INADDR_ANY) 
    sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) 
    while True: 
     q.put(sock.recv(512)) 


def consumer(q): 
    packet_stream = PacketStream('D:/bmpdump/') 
    while True: 
     data = q.get() 
     packet_stream.process(data) 

i = 0 
if __name__ == '__main__': 
    q = Queue() 

    t1 = Process(target=producer, args=(q,)) 
    t1.daemon = True # so they stop when the main prog stops 
    t1.start() 
    t2 = Process(target=consumer, args=(q,)) 
    t2.daemon = True 
    t2.start() 

    time.sleep(10.0) 

    print 'Program finished.' 

編輯

感謝所有的建議。

1)我已經試過穿+隊列,也是「」。加入(),似乎並沒有太大的差別。我很確定現在的問題是生產者線程沒有得到足夠的優先級。我無法找到如何使用Python來增加此功能?這甚至有可能嗎?

2)我設法使用下面的代碼只損失大約10%。處理器是在〜25%(對樹莓PI)的關鍵是消費數據,當有數據包流,即在暫停時的最後一個數據包已經到達

import time 
import socket 
import struct 
from PIL import Image 


def write_image(path, data): 
    im = Image.frombuffer('L', (640, 480), bytearray(data), 'raw', 'L', 0, 1) 
    im.save(path) 
    print time.time(), path 

def consume(data_buffer): 
    img_id = ord(data_buffer[0][1]) 
    real_data_buffer = [data[6:] for data in data_buffer] 
    data_string = ''.join(real_data_buffer) 

    global i 
    write_image('/media/pi/exthdd_02/bmpdump/' + str(i).zfill(7) + '_' + str(img_id).zfill(3) + '.bmp', data_string) 
    i += 1 

def producer(sock): 
    print 'Producer start' 
    data_buffer = [] 
    while True: 
     data = sock.recvfrom(512)[0] 
     if data[0] == '\x01': 
      data_buffer = [] 
     else: 
      data_buffer.append(data) 
     if len(data_buffer) == 612: 
      consume(data_buffer) 


# image counter 
i = 0 

# setup socket 
MCAST_GRP = '239.255.83.71' 
MCAST_PORT = 2271 
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 
sock.bind((MCAST_GRP, MCAST_PORT)) 
mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY) 
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) 
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 30000000) 

producer(sock) 
+0

我想你在處理UDP套接字時必須使用sock.recvfrom而不是sock.recv。也許這有幫助? –

+0

您可能需要增加UDP數據讀取線程/進程的優先級(在Linux上使用sched_setscheduler()或類似的函數),或者在您可能使用的任何其他操作系統上使用類似的API。我不確定Python API是什麼與之相對應)。這樣一來,您的讀取器線程就不太可能被另一個任務從CPU中取出,這可能會導致完整的接收緩衝區和丟棄的數據包。 –

回答

0

一些建議,以改善你的代碼,但首先是一個問題:你是否測量過可能會減慢速度的東西?比如你看過你係統的CPU使用情況。如果你達到100%,那很可能是數據包丟失的原因。如果它大部分處於閒置狀態,則還有其他事情正在進行,問題與代碼的性能無關。

現在,一些建議,以提高代碼:

  • 使用socket.recvfrom代替sock.recv與UDP套接字打交道時
  • 不要使用與過程多處理,對於必須要發生的系列化送如果我們正在談論約9000個呼叫/秒,那麼從一個流程到另一個流程的數據可能會成爲性能瓶頸。嘗試使用線程改爲(threading + queue模塊)。但是,由於你沒有提供任何觀察到的數字,所以很難說。
  • 不使用字符串連接來建立接收方的緩衝區,因爲它獲取數據包。這會創建大量新的臨時字符串對象並隨時複製數據。相反,將每個數據包追加到一個列表中,並且當您收到所有數據時,它們會一起結尾。