您複製的代碼缺少標題部分,這非常重要。
# This is *not* a @coroutine! It is just an iterator (yielding Futures).
def as_completed(fs, *, loop=None, timeout=None):
"""Return an iterator whose values are coroutines.
When waiting for the yielded coroutines you'll get the results (or
exceptions!) of the original Futures (or coroutines), in the order
in which and as soon as they complete.
This differs from PEP 3148; the proper way to use this is:
for f in as_completed(fs):
result = yield from f # The 'yield from' may raise.
# Use result.
If a timeout is specified, the 'yield from' will raise
TimeoutError when the timeout occurs before all Futures are done.
Note: The futures 'f' are not necessarily members of fs.
"""
if futures.isfuture(fs) or coroutines.iscoroutine(fs):
raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
loop = loop if loop is not None else events.get_event_loop()
todo = {ensure_future(f, loop=loop) for f in set(fs)}
from .queues import Queue # Import here to avoid circular import problem.
done = Queue(loop=loop)
timeout_handle = None
def _on_timeout():
for f in todo:
f.remove_done_callback(_on_completion)
done.put_nowait(None) # Queue a dummy value for _wait_for_one().
todo.clear() # Can't do todo.remove(f) in the loop.
def _on_completion(f):
if not todo:
return # _on_timeout() was here first.
todo.remove(f)
done.put_nowait(f)
if not todo and timeout_handle is not None:
timeout_handle.cancel()
@coroutine
def _wait_for_one():
f = yield from done.get()
if f is None:
# Dummy value from _on_timeout().
raise futures.TimeoutError
return f.result() # May raise f.exception().
for f in todo:
f.add_done_callback(_on_completion)
if todo and timeout is not None:
timeout_handle = loop.call_later(timeout, _on_timeout)
for _ in range(len(todo)):
yield _wait_for_one()
[哪裏循環實際運行?]
對於semplicity起見,假設超時設置爲無。
as_completed期望可迭代期貨,而不是協同程序。所以這個期貨已經被綁定到循環並且被安排執行。換句話說,那些期貨是loop.create_task或asyncio.ensure_futures的輸出(並且這是無處顯示的)。 因此,循環已經「運行」了它們,當它們完成時,它們的未來.done()方法將返回True。
然後創建「完成」隊列。請注意,「完成」隊列是asyncio.queue的遺傳,即使用循環«實現阻塞方法(.get,.put)»的隊列。
通過「todo = {...」這一行,每個協程的未來(這是fs的一個元素)被綁定到另一個未來»綁定到循環«,並且這個最後的未來的done_callback被設置爲調用_on_completion功能。
當循環將完成協程的執行時,將調用_on_completion函數,該協程的期限在設置爲as_completed函數的「fs」中傳遞。
_on_completion函數從待辦事項集中移除「我們的未來」,並將其結果(即未來在「fs」集中的協同程序)放入完成隊列中。 換句話說,as_completed函數所做的全部工作就是將這些期貨附加到done_callback中,以便將原始未來的結果移入已完成的隊列。
然後,對於len(fs)== len(todo)次,as_completed函數會生成一個協程,用於阻止「從完成產出」。get()「,等待_on_completed(或_on_timeout)函數將結果放入完成隊列中。
as_completed調用者執行的」yield from「s將等待顯示結果在
完成的隊列。[哪裏是產量從何而來?]
它來自於一個事實,即待辦事項是asyncio.queue,這樣你就可以(asyncio-)塊,直到值。把( )在隊列中。