2017-05-13 77 views
0

我需要處理來自網絡攝像頭的幀並將幾個選定的幀發送到遠程websocket服務器。服務器立即回覆確認消息(很像回聲服務器)。 幀處理速度慢,CPU密集型,所以我想使用獨立的線程池(製作者)來使用它來使用所有可用的內核。所以客戶(消費者)只是閒置,直到池有東西要發送。 我的當前實現,見下文,只有在的生產者測試循環內添加一個小睡時才能正常工作。如果我刪除這個延遲,我停止接收任何答案從服務器(回聲服務器和我的真實服務器)。即使第一個答案是失敗的,所以我不認爲這是一個防洪機制。 我在做什麼錯?龍捲風websocket客戶端丟失響應消息?

import tornado 
from tornado.websocket import websocket_connect 
from tornado import gen, queues 

import time 

class TornadoClient(object): 

    url = None 
    onMessageReceived = None 
    onMessageSent = None 

    ioloop = tornado.ioloop.IOLoop.current() 
    q = queues.Queue() 

    def __init__(self, url, onMessageReceived, onMessageSent): 
     self.url = url 
     self.onMessageReceived = onMessageReceived 
     self.onMessageSent = onMessageSent 

    def enqueueMessage(self, msgData, binary=False): 
     print("TornadoClient.enqueueMessage") 

     self.ioloop.add_callback(self.addToQueue, (msgData, binary)) 

     print("TornadoClient.enqueueMessage done") 

    @gen.coroutine 
    def addToQueue(self, msgTuple): 
     yield self.q.put(msgTuple) 

    @gen.coroutine 
    def main_loop(self): 

     connection = None 
     try: 
      while True: 

       while connection is None: 
       try: 
        print("Connecting...") 
        connection = yield websocket_connect(self.url) 
        print("Connected " + str(connection)) 
       except Exception, e: 
        print("Exception on connection " + str(e)) 
        connection = None 
        print("Retry in a few seconds...") 
        yield gen.Task(self.ioloop.add_timeout, time.time() + 3) 

       try: 
        print("Waiting for data to send...") 
        msgData, binaryVal = yield self.q.get() 
        print("Writing...") 
        sendFuture = connection.write_message(msgData, binary=binaryVal) 
        print("Write scheduled...") 
       finally: 
       self.q.task_done() 

       yield sendFuture 
       self.onMessageSent("Sent ok") 

       print("Write done. Reading...") 
       msg = yield connection.read_message() 
       print("Got msg.") 
       self.onMessageReceived(msg) 

       if msg is None: 
       print("Connection lost") 
       connection = None 

      print("main loop completed") 
     except Exception, e: 
      print("ExceptionExceptionException") 
      print(e) 
      connection = None 

     print("Exit main_loop function") 

    def start(self): 
     self.ioloop.run_sync(self.main_loop) 
     print("Main loop completed") 


######### TEST METHODS ######### 

def sendMessages(client): 
    time.sleep(2) #TEST only: wait for client startup 
    while True: 
     client.enqueueMessage("msgData", binary=False) 
     time.sleep(1) # <--- comment this line to break it 

def testPrintMessage(msg): 
    print("Received: " + str(msg)) 

def testPrintSentMessage(msg): 
    print("Sent: " + msg) 


if __name__=='__main__': 

    from threading import Thread 

    client = TornadoClient("ws://echo.websocket.org", testPrintMessage, testPrintSentMessage) 

    thread = Thread(target = sendMessages, args = (client,)) 
    thread.start() 

    client.start() 

我真正的問題

在我真正的程序中,我使用「窗口像」機制,以保護消費者(的autobahn.twisted.websocket服務器):製片人最多可以發送的最大數量未確認消息(網絡攝像頭幀),然後停止等待一半的窗口釋放。 消費者發送一個「PROCESSED」消息迴應一個或多個消息(只是一個計數器,而不是ID)。 我在消費者日誌中看到的是消息被處理並且答覆被髮回,但是這些消息在網絡中的某處消失。

我對asynchio幾乎沒有什麼經驗,所以我想確保我不缺少任何產量,註釋或其他東西。

這是消費者端日誌:

2017-05-13 18:59:54+0200 [-] TX Frame to tcp4:192.168.0.5:48964 : fin = True, rsv = 0, opcode = 1, mask = -, length = 21, repeat_length = None, chopsize = None, sync = False, payload = {"type": "PROCESSED"} 
2017-05-13 18:59:54+0200 [-] TX Octets to tcp4:192.168.0.5:48964 : sync = False, octets = 81157b2274797065223a202250524f434553534544227d 

回答

0

這是整潔的代碼。我相信你需要在你的sendMessages線程中睡眠的原因是,否則,它會盡可能快地以每秒數百萬次的速度呼叫enqueueMessage。由於enqueueMessage的確如此,而不是等待處理入隊的消息,它會盡可能快地調用IOLoop.add_callback,而不會給循環足夠的機會來執行回調。

該循環可能使某些進度在主線程上運行,因爲您實際上並未阻止它。但是sendMessages線程比循環可以處理它們快得多。當循環從隊列中彈出一條消息並開始處理它時,已經添加了數百萬個新的回調,在回到下一階段的消息處理之前,循環必須執行。

因此,對於您的測試代碼,我認爲在線程上調用enqueueMessage之間的睡眠是正確的。

+0

好吧,我明白了。這個最小的例子是一個極端的:)但爲什麼我看到這與我的真實代碼相同的行爲?我會爲我的問題添加一些額外的信息。 – lorenzo