2016-07-07 17 views
0

由於recvmsg()asyncio模塊丟失,我試圖重新實現它以完全相同的方式BaseEventLoop.sock_recv()實現:如何使用asyncio實現recvmsg()?

import asyncio, socket 


def _sock_recvmsg(loop, fut, registered, sock, bufsize, ancbufsize): 
    self = loop 
    fd = sock.fileno() 
    if registered: self.remove_reader(fd) 
    if fut.cancelled(): return 
    try: data = sock.recvmsg(bufsize, ancbufsize) 
    except (BlockingIOError, InterruptedError): self.add_reader(fd, self._sock_recvmsg, fut, True, sock, bufsize, ancbufsize) 
    except Exception as exc: fut.set_exception(exc) 
    else: fut.set_result(data) 


def sock_recvmsg(loop, sock, bufsize, ancbufsize=0): 
    self = loop 
    if self._debug and sock.gettimeout() != 0: raise ValueError('the socket must be non-blocking') 
    fut = asyncio.futures.Future(loop=self) 
    self._sock_recvmsg(fut, False, sock, bufsize, ancbufsize) 
    return fut 


asyncio.unix_events._UnixSelectorEventLoop._sock_recvmsg = _sock_recvmsg 
asyncio.unix_events._UnixSelectorEventLoop.sock_recvmsg = sock_recvmsg 

但這種平凡的測試失敗,只接收第一值和試驗後掛起即:

async def produce(): 
    for i in range(5): 
     end_out.sendmsg([bytes([i])]) 
     await asyncio.sleep(1) 

    end_out.close() 

async def consume(): 
    while True: 
     v, a, b, c = await asyncio.get_event_loop().sock_recvmsg(end_in, 1) 
     if v == b'': return 
     print(v) 

if __name__ == '__main__': 
    end_in, end_out = socket.socketpair() 
    asyncio.ensure_future(produce()) 
    asyncio.get_event_loop().run_until_complete(consume()) 

我錯過了什麼?

回答

0

recvmsg() - 相關代碼是正確的,但我已經錯過了

end_in.setblocking(0) 
end_out.setblocking(0) 

之所以如此,是愚蠢的,所以我在考慮這個問題的缺失。