asyncio.Task.all_tasks()給出了一個事件循環的所有任務的列表,但我找不到任何類似的套接字,特別是與數據報關聯的套接字一個循環?如何檢查asyncio循環是否有任何關聯的套接字
沒有插座&任務然後可以表示循環的「生命結束」。
的問題是,在下面的例子中,要放什麼loop_not_empty()
,使得它返回False
當任務設置爲空和沒有關聯的插座(即兩秒鐘後)
例子:
import asyncio
import socket
import threading
class Handler(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
print("connection made")
def datagram_received(self, data, addr):
if data == b'die':
print("shutting down")
self.transport.abort()
@asyncio.coroutine
def sometask():
yield from asyncio.sleep(1)
print("task done")
def loop_not_empty(l):
# if asyncio.Task.all_tasks() == set() and WHAT_GOES_HERE
# return False
return True
def main():
a,b = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM)
l = asyncio.get_event_loop()
asyncio.ensure_future(sometask(), loop=l)
asyncio.ensure_future(l.create_datagram_endpoint(Handler, sock=a), loop=l)
threading.Timer(2, lambda: b.send(b'die')).start()
while loop_not_empty(l):
l.run_until_complete(asyncio.sleep(1, loop=l))
main()
這個不清楚。請提供一段代碼,您希望執行此「生命週期」檢查。 – Udi
試圖澄清一個例子。 – ttyridal