2017-03-19 23 views
1

我有這段代碼,我想連接到一個websocket,並通過發送心跳保持連接活着。雖然這樣做,我也希望能夠發送有效載荷WebSocket的,但我一直陷入等待我keepAlive協程如何運行一個不停止的協程,而不會等待結果呢?

import asyncio 
import websockets 
ws=#path goes here 
async def sendHeartbeat(socket): 
    message={ 
     "op": 1, 
     "d": 0 
     } 
    await socket.send(json.dumps(message)) 
    print("heartbeat sent") 


async def keepAlive(socket): 
    while 1==1: 
     await sendHeartbeat(socket) 
     resp=await(socket.recv()) 
     print(resp) 
     if json.loads(resp)["op"]==11: 
      #the server sent an ack back 
      print("ack received") 
     else: 
      socket.close() 
     await asyncio.sleep(10) 




async def beginSocket(loop): 
    async with websockets.connect(ws) as socket: 
     print(await socket.recv()) 
     await keepAlive(socket) 
     #payloads would be sent here 
     print("Hey payloads being sent and stuff") 

loop = asyncio.get_event_loop() 
loop.run_until_complete(beginSocket(loop)) 

的結果有了這塊然而代碼,打印語句等待keepAlive是從來沒有後打印。我怎樣才能讓代碼不會等待keepAlive的結果?

+0

我的建議是在嘗試更復雜之前理解並執行示例:https://docs.python.org/3/library/asyncio-task.html。換句話說,從Hello World開始。 :) 祝你好運! – ostergaard

回答

0

雖然keepAlive(socket)會立刻返回,因爲keepAlive是couroutine,await keepAlive(socket)就再也沒有回到因爲keepAlive()包括一個無限循環。

而不是使用await,請嘗試asyncio.ensure_future(keepAlive(socket))

如果您確實想使用await keepAlive(socket),請嘗試從其他地方發送您的有效載荷(可能事先使用asyncio.ensure_future(send_payload(socket)))。

0

這種情況下,您在概念上在同一個套接字上有兩個單獨的對話。一個對話是你的心跳和回覆消息。另一個是你發送的其他數據包。

我會保留3個獨立的頂層(即直接向事件循環報告)任務。我希望他們都保留一位協調員的參考資料,這樣他們中的任何一位都可以取消所有其他協議人員。

  1. 心跳任務,基本上你的keepAlive函數。
  2. 處理您通過websocket進行的其他對話的任務。
  3. 從websocket複用讀取的任務。

複用任務的工作是將消息路由到適當的任務。心跳任務應該只能獲得心跳響應,而其他任務應該獲得所有其他消息。

由於websockets已經發送了消息,所以您只能使用sendrecv整個消息,但它可能具有的其他作業並不相關。

這裏有一種方法可以寫這個。

import asyncio 
import websockets 
ws=#path goes here 

class RoutingTask(object): 
    def __init__(self, sock, defaultQueue, **kwargs): 
     super().__init__(**kwargs) 
     self.sock = sock 
     self.defaultQueue = defaultQueue # The queue all messages not otherwise matched go to. 
     self.matchers = [] 

    async def run(self): 
     while True: 
      msg = await self.sock.recv() 
      msg = json.loads(msg) 
      matched = False 
      for matcher, queue in matchers: 
       if matcher(msg): 
        await queue.put(msg) 
        matched = True 
        break 
      if not matched: 
       await self.defaultQueue.put(msg) 

    def addMatcher(self, matcher, queue): 
     el = (matcher, queue) 
     self.matchers.append(el) 

async def heartbeatTask(wssock, incomingq): 
    message=json.dumps({ 
     "op": 1, 
     "d": 0 
     }) # Do this just once. 
    while True: 
     await wssock.send(message) 
     print("heartbeat sent") 
     response = await asyncio.wait_for(incomingq.get(), 10) # Wait 10 seconds for response. 
     assert response['op'] == 11 
     print("heartbeat response received.") 
     await asyncio.sleep(10) # Wait 10 seconds to send another heartbeat. 

async def beginSocket(loop): 
    def heartbeatMatcher(jsondict): 
     return jsondict.get('op', None) == 11 

    async with websockets.connect(ws) as socket: 
     myq = asyncio.Queue(maxsize=1) 
     heartbeatq = asyncio.Queue(maxsize=1) 
     router = RoutingTask(socket, myq) 
     router.addMatcher(heartbeatMatcher, heartbeatq) 
     router = asyncio.ensure_future(router.run()) 
     heartbeat = asyncio.ensure_future(heartbeatTask(socket, heartbeatq) 

     print(await myq.get()) 
     #payloads would be sent here 
     print("Hey payloads being sent and stuff") 

    heartbeat.cancel() # Stop the heartbeat 
    router.cancel() # Stop the router task 

loop = asyncio.get_event_loop() 
loop.run_until_complete(beginSocket(loop)) 

這裏有一些問題。如果拋出異常,則heartbeatrouter任務可能不會被取消。他們也沒有真正好的方法來將問題報告回主要的beginSocket任務。這基本上是一種快速和骯髒的一次性來演示如何做你想做的事情。

在我看來,asyncio.ensure_future是錯誤的。它所做的是告訴事件循環,有一件事情需要繼續運行。它基本上啓動了一個線程的協程等價物。

相關問題