這種情況下,您在概念上在同一個套接字上有兩個單獨的對話。一個對話是你的心跳和回覆消息。另一個是你發送的其他數據包。
我會保留3個獨立的頂層(即直接向事件循環報告)任務。我希望他們都保留一位協調員的參考資料,這樣他們中的任何一位都可以取消所有其他協議人員。
- 心跳任務,基本上你的
keepAlive
函數。
- 處理您通過websocket進行的其他對話的任務。
- 從websocket複用讀取的任務。
複用任務的工作是將消息路由到適當的任務。心跳任務應該只能獲得心跳響應,而其他任務應該獲得所有其他消息。
由於websockets已經發送了消息,所以您只能使用send
或recv
整個消息,但它可能具有的其他作業並不相關。
這裏有一種方法可以寫這個。
import asyncio
import websockets
ws=#path goes here
class RoutingTask(object):
def __init__(self, sock, defaultQueue, **kwargs):
super().__init__(**kwargs)
self.sock = sock
self.defaultQueue = defaultQueue # The queue all messages not otherwise matched go to.
self.matchers = []
async def run(self):
while True:
msg = await self.sock.recv()
msg = json.loads(msg)
matched = False
for matcher, queue in matchers:
if matcher(msg):
await queue.put(msg)
matched = True
break
if not matched:
await self.defaultQueue.put(msg)
def addMatcher(self, matcher, queue):
el = (matcher, queue)
self.matchers.append(el)
async def heartbeatTask(wssock, incomingq):
message=json.dumps({
"op": 1,
"d": 0
}) # Do this just once.
while True:
await wssock.send(message)
print("heartbeat sent")
response = await asyncio.wait_for(incomingq.get(), 10) # Wait 10 seconds for response.
assert response['op'] == 11
print("heartbeat response received.")
await asyncio.sleep(10) # Wait 10 seconds to send another heartbeat.
async def beginSocket(loop):
def heartbeatMatcher(jsondict):
return jsondict.get('op', None) == 11
async with websockets.connect(ws) as socket:
myq = asyncio.Queue(maxsize=1)
heartbeatq = asyncio.Queue(maxsize=1)
router = RoutingTask(socket, myq)
router.addMatcher(heartbeatMatcher, heartbeatq)
router = asyncio.ensure_future(router.run())
heartbeat = asyncio.ensure_future(heartbeatTask(socket, heartbeatq)
print(await myq.get())
#payloads would be sent here
print("Hey payloads being sent and stuff")
heartbeat.cancel() # Stop the heartbeat
router.cancel() # Stop the router task
loop = asyncio.get_event_loop()
loop.run_until_complete(beginSocket(loop))
這裏有一些問題。如果拋出異常,則heartbeat
和router
任務可能不會被取消。他們也沒有真正好的方法來將問題報告回主要的beginSocket
任務。這基本上是一種快速和骯髒的一次性來演示如何做你想做的事情。
在我看來,asyncio.ensure_future
是錯誤的。它所做的是告訴事件循環,有一件事情需要繼續運行。它基本上啓動了一個線程的協程等價物。
我的建議是在嘗試更復雜之前理解並執行示例:https://docs.python.org/3/library/asyncio-task.html。換句話說,從Hello World開始。 :) 祝你好運! – ostergaard