2017-05-18 69 views
0

閱讀this answer,我跑過了asyncio.tasks.as_completed。我不明白這個功能是如何工作的。它被記錄爲一個非異步例程,它按照它們完成的順序返回期貨。 它創建一個與事件循環相關聯的隊列,爲每個將來添加一個完成回調,然後試圖從隊列中獲取與期貨一樣多的項目。asyncio.as_completed如何工作

代碼的核心是如下:

def _on_completion(f): 
     if not todo: 
      return # _on_timeout() was here first. 
     todo.remove(f) 
     done.put_nowait(f) 
     if not todo and timeout_handle is not None: 
      timeout_handle.cancel() 

    @coroutine 
    def _wait_for_one(): 
     f = yield from done.get() 
     if f is None: 
      # Dummy value from _on_timeout(). 
      raise futures.TimeoutError 
     return f.result() # May raise f.exception(). 

    for f in todo: 
     f.add_done_callback(_on_completion) 
    if todo and timeout is not None: 
     timeout_handle = loop.call_later(timeout, _on_timeout) 
    for _ in range(len(todo)): 
     yield _wait_for_one() 

我想理解這段代碼是如何工作的。我最大的疑問是:

  • 循環在哪裏實際運行。我沒有看到任何對loop.run_until_cobmplete或loop.run_forever的調用。那麼循環如何進展?

  • 方法文檔說該方法返回期貨。那你可以把它在as_completed(期貨)類似

    爲F: 結果=產量是F

我無法調和,對在_wait_for_one返回f.result線。記錄的調用約定是否正確?如果是這樣,那麼收益率從哪裏來?

回答

1

您複製的代碼缺少標題部分,這非常重要。

# This is *not* a @coroutine! It is just an iterator (yielding Futures). 
def as_completed(fs, *, loop=None, timeout=None): 
    """Return an iterator whose values are coroutines. 

    When waiting for the yielded coroutines you'll get the results (or 
    exceptions!) of the original Futures (or coroutines), in the order 
    in which and as soon as they complete. 

    This differs from PEP 3148; the proper way to use this is: 

     for f in as_completed(fs): 
      result = yield from f # The 'yield from' may raise. 
      # Use result. 

    If a timeout is specified, the 'yield from' will raise 
    TimeoutError when the timeout occurs before all Futures are done. 

    Note: The futures 'f' are not necessarily members of fs. 
    """ 
    if futures.isfuture(fs) or coroutines.iscoroutine(fs): 
     raise TypeError("expect a list of futures, not %s" % type(fs).__name__) 
    loop = loop if loop is not None else events.get_event_loop() 
    todo = {ensure_future(f, loop=loop) for f in set(fs)} 
    from .queues import Queue # Import here to avoid circular import problem. 
    done = Queue(loop=loop) 
    timeout_handle = None 

    def _on_timeout(): 
     for f in todo: 
      f.remove_done_callback(_on_completion) 
      done.put_nowait(None) # Queue a dummy value for _wait_for_one(). 
     todo.clear() # Can't do todo.remove(f) in the loop. 

    def _on_completion(f): 
     if not todo: 
      return # _on_timeout() was here first. 
     todo.remove(f) 
     done.put_nowait(f) 
     if not todo and timeout_handle is not None: 
      timeout_handle.cancel() 

    @coroutine 
    def _wait_for_one(): 
     f = yield from done.get() 
     if f is None: 
      # Dummy value from _on_timeout(). 
      raise futures.TimeoutError 
     return f.result() # May raise f.exception(). 

    for f in todo: 
     f.add_done_callback(_on_completion) 
    if todo and timeout is not None: 
     timeout_handle = loop.call_later(timeout, _on_timeout) 
    for _ in range(len(todo)): 
     yield _wait_for_one() 

[哪裏循環實際運行?]

對於semplicity起見,假設超時設置爲無。

as_completed期望可迭代期貨,而不是協同程序。所以這個期貨已經被綁定到循環並且被安排執行。換句話說,那些期貨是loop.create_task或asyncio.ensure_futures的輸出(並且這是無處顯示的)。 因此,循環已經「運行」了它們,當它們完成時,它們的未來.done()方法將返回True。

然後創建「完成」隊列。請注意,「完成」隊列是asyncio.queue的遺傳,即使用循環«實現阻塞方法(.get,.put)»的隊列。

通過「todo = {...」這一行,每個協程的未來(這是fs的一個元素)被綁定到另一個未來»綁定到循環«,並且這個最後的未來的done_callback被設置爲調用_on_completion功能。

當循環將完成協程的執行時,將調用_on_completion函數,該協程的期限在設置爲as_completed函數的「fs」中傳遞。

_on_completion函數從待辦事項集中移除「我們的未來」,並將其結果(即未來在「fs」集中的協同程序)放入完成隊列中。 換句話說,as_completed函數所做的全部工作就是將這些期貨附加到done_callback中,以便將原始未來的結果移入已完成的隊列。

然後,對於len(fs)== len(todo)次,as_completed函數會生成一個協程,用於阻止「從完成產出」。get()「,等待_on_completed(或_on_timeout)函數將結果放入完成隊列中。

as_completed調用者執行的」yield from「s將等待顯示結果在

完成的隊列。[哪裏是產量從何而來?]

它來自於一個事實,即待辦事項是asyncio.queue,這樣你就可以(asyncio-)塊,直到值。把( )在隊列中。