我需要處理來自網絡攝像頭的幀並將幾個選定的幀發送到遠程websocket服務器。服務器立即回覆確認消息(很像回聲服務器)。 幀處理速度慢,CPU密集型,所以我想使用獨立的線程池(製作者)來使用它來使用所有可用的內核。所以客戶(消費者)只是閒置,直到池有東西要發送。 我的當前實現,見下文,只有在的生產者測試循環內添加一個小睡時才能正常工作。如果我刪除這個延遲,我停止接收任何答案從服務器(回聲服務器和我的真實服務器)。即使第一個答案是失敗的,所以我不認爲這是一個防洪機制。 我在做什麼錯?龍捲風websocket客戶端丟失響應消息?
import tornado
from tornado.websocket import websocket_connect
from tornado import gen, queues
import time
class TornadoClient(object):
url = None
onMessageReceived = None
onMessageSent = None
ioloop = tornado.ioloop.IOLoop.current()
q = queues.Queue()
def __init__(self, url, onMessageReceived, onMessageSent):
self.url = url
self.onMessageReceived = onMessageReceived
self.onMessageSent = onMessageSent
def enqueueMessage(self, msgData, binary=False):
print("TornadoClient.enqueueMessage")
self.ioloop.add_callback(self.addToQueue, (msgData, binary))
print("TornadoClient.enqueueMessage done")
@gen.coroutine
def addToQueue(self, msgTuple):
yield self.q.put(msgTuple)
@gen.coroutine
def main_loop(self):
connection = None
try:
while True:
while connection is None:
try:
print("Connecting...")
connection = yield websocket_connect(self.url)
print("Connected " + str(connection))
except Exception, e:
print("Exception on connection " + str(e))
connection = None
print("Retry in a few seconds...")
yield gen.Task(self.ioloop.add_timeout, time.time() + 3)
try:
print("Waiting for data to send...")
msgData, binaryVal = yield self.q.get()
print("Writing...")
sendFuture = connection.write_message(msgData, binary=binaryVal)
print("Write scheduled...")
finally:
self.q.task_done()
yield sendFuture
self.onMessageSent("Sent ok")
print("Write done. Reading...")
msg = yield connection.read_message()
print("Got msg.")
self.onMessageReceived(msg)
if msg is None:
print("Connection lost")
connection = None
print("main loop completed")
except Exception, e:
print("ExceptionExceptionException")
print(e)
connection = None
print("Exit main_loop function")
def start(self):
self.ioloop.run_sync(self.main_loop)
print("Main loop completed")
######### TEST METHODS #########
def sendMessages(client):
time.sleep(2) #TEST only: wait for client startup
while True:
client.enqueueMessage("msgData", binary=False)
time.sleep(1) # <--- comment this line to break it
def testPrintMessage(msg):
print("Received: " + str(msg))
def testPrintSentMessage(msg):
print("Sent: " + msg)
if __name__=='__main__':
from threading import Thread
client = TornadoClient("ws://echo.websocket.org", testPrintMessage, testPrintSentMessage)
thread = Thread(target = sendMessages, args = (client,))
thread.start()
client.start()
我真正的問題
在我真正的程序中,我使用「窗口像」機制,以保護消費者(的autobahn.twisted.websocket服務器):製片人最多可以發送的最大數量未確認消息(網絡攝像頭幀),然後停止等待一半的窗口釋放。 消費者發送一個「PROCESSED」消息迴應一個或多個消息(只是一個計數器,而不是ID)。 我在消費者日誌中看到的是消息被處理並且答覆被髮回,但是這些消息在網絡中的某處消失。
我對asynchio幾乎沒有什麼經驗,所以我想確保我不缺少任何產量,註釋或其他東西。
這是消費者端日誌:
2017-05-13 18:59:54+0200 [-] TX Frame to tcp4:192.168.0.5:48964 : fin = True, rsv = 0, opcode = 1, mask = -, length = 21, repeat_length = None, chopsize = None, sync = False, payload = {"type": "PROCESSED"}
2017-05-13 18:59:54+0200 [-] TX Octets to tcp4:192.168.0.5:48964 : sync = False, octets = 81157b2274797065223a202250524f434553534544227d
好吧,我明白了。這個最小的例子是一個極端的:)但爲什麼我看到這與我的真實代碼相同的行爲?我會爲我的問題添加一些額外的信息。 – lorenzo