2013-12-23 26 views
11

我在執行新的Python asyncio模塊的asyncio.Protocol.data_received回調中執行異步程序時遇到問題。調用asyncio.Protocol.data_received中的協同程序

考慮以下服務:

class MathServer(asyncio.Protocol): 

    @asyncio.coroutine 
    def slow_sqrt(self, x): 
     yield from asyncio.sleep(1) 
     return math.sqrt(x) 

    def fast_sqrt(self, x): 
     return math.sqrt(x) 

    def connection_made(self, transport): 
     self.transport = transport 

    #@asyncio.coroutine 
    def data_received(self, data): 
     print('data received: {}'.format(data.decode())) 
     x = json.loads(data.decode()) 
     #res = self.fast_sqrt(x) 
     res = yield from self.slow_sqrt(x) 
     self.transport.write(json.dumps(res).encode('utf8')) 
     self.transport.close() 
與下面的客戶端使用

class MathClient(asyncio.Protocol): 

    def connection_made(self, transport): 
     transport.write(json.dumps(2.).encode('utf8')) 

    def data_received(self, data): 
     print('data received: {}'.format(data.decode())) 

    def connection_lost(self, exc): 
     asyncio.get_event_loop().stop() 

隨着self.fast_sqrt被調用,一切正常。

self.slow_sqrt,它不起作用。

它也不適用於self.fast_sqrt@asyncio.coroutine修飾器data_received

我覺得我錯過了一些基本的東西。

完整的代碼是在這裏:

與測試:

  • 的Python 3.4.0b1(視窗)
  • 的Python 3.3.3 + asyncio-0.2.1(FreeBSD)

這兩個問題是一樣的:與slow_sqrt,客戶端/服務器將掛起什麼都不做。

回答

6

看起來,這需要通過Future解耦 - 儘管我仍不確定這是否正確。

class MathServer(asyncio.Protocol): 

    @asyncio.coroutine 
    def slow_sqrt(self, x): 
     yield from asyncio.sleep(2) 
     return math.sqrt(x) 

    def fast_sqrt(self, x): 
     return math.sqrt(x) 

    def consume(self): 
     while True: 
     self.waiter = asyncio.Future() 
     yield from self.waiter 
     while len(self.receive_queue): 
      data = self.receive_queue.popleft() 
      if self.transport: 
       try: 
        res = self.process(data) 
        if isinstance(res, asyncio.Future) or \ 
        inspect.isgenerator(res): 
        res = yield from res 
       except Exception as e: 
        print(e) 

    def connection_made(self, transport): 
     self.transport = transport 
     self.receive_queue = deque() 
     asyncio.Task(self.consume()) 

    def data_received(self, data): 
     self.receive_queue.append(data) 
     if not self.waiter.done(): 
     self.waiter.set_result(None) 
     print("data_received {} {}".format(len(data), len(self.receive_queue))) 

    def process(self, data): 
     x = json.loads(data.decode()) 
     #res = self.fast_sqrt(x) 
     res = yield from self.slow_sqrt(x) 
     self.transport.write(json.dumps(res).encode('utf8')) 
     #self.transport.close() 

    def connection_lost(self, exc): 
     self.transport = None 

下面是一個answer由Guido van Rossum的:

的解決方案是簡單:寫邏輯作爲單獨的方法與@coroutine標記 ,以及使用 async()data_received()火其關閉(== Task() , 在這種情況下)。之所以這樣做並不是將 內置到協議中,是因爲如果是這樣,它將需要使用替代事件 循環實現來處理協程。

全部代碼是在這裏: - Client - Server