2017-01-16 26 views
1

我有一個pool_map函數可以用來限制同時執行的函數的數量。如何使asyncio池可取消?

的想法是具有coroutine function接受被映射到的可能的參數的列表的單個參數,而且還包所有函數調用成旗語取得,於是只有有限數量的運行一次:

from typing import Callable, Awaitable, Iterable, Iterator 
from asyncio import Semaphore 

A = TypeVar('A') 
V = TypeVar('V') 

async def pool_map(
    func: Callable[[A], Awaitable[V]], 
    arg_it: Iterable[A], 
    size: int=10 
) -> Generator[Awaitable[V], None, None]: 
    """ 
    Maps an async function to iterables 
    ensuring that only some are executed at once. 
    """ 
    semaphore = Semaphore(size) 

    async def sub(arg): 
     async with semaphore: 
      return await func(arg) 

    return map(sub, arg_it) 

爲了舉例,我修改了上面的代碼並沒有對它進行測試,但是我的變體運行良好。例如。你可以這樣使用它:

from asyncio import get_event_loop, coroutine, as_completed 
from contextlib import closing 

URLS = [...] 

async def run_all(awaitables): 
    for a in as_completed(awaitables): 
     result = await a 
     print('got result', result) 

async def download(url): ... 


if __name__ != '__main__': 
    pool = pool_map(download, URLS) 

    with closing(get_event_loop()) as loop: 
     loop.run_until_complete(run_all(pool)) 

但是如果在等待未來時拋出異常,就會出現問題。我無法看到如何取消所有計劃或仍在運行的任務,也沒有人正在等待信號量的獲取。

有沒有一個圖書館或這個我不知道的優雅建築模塊,還是我必須自己構建所有零件? (即Semaphore可以訪問它的服務生,一個as_finished提供其運行的任務隊列的訪問,...)

回答

1

使用ensure_future獲得Task而不是協程:

import asyncio 
from contextlib import closing 


def pool_map(func, args, size=10): 
    """ 
    Maps an async function to iterables 
    ensuring that only some are executed at once. 
    """ 
    semaphore = asyncio.Semaphore(size) 

    async def sub(arg): 
     async with semaphore: 
      return await func(arg) 

    tasks = [asyncio.ensure_future(sub(x)) for x in args] 

    return tasks 


async def f(n): 
    print(">>> start", n) 

    if n == 7: 
     raise Exception("boom!") 

    await asyncio.sleep(n/10) 

    print("<<< end", n) 
    return n 


async def run_all(tasks): 
    exc = None 
    for a in asyncio.as_completed(tasks): 
     try: 
      result = await a 
      print('=== result', result) 
     except asyncio.CancelledError as e: 
      print("!!! cancel", e) 
     except Exception as e: 
      print("Exception in task, cancelling!") 
      for t in tasks: 
       t.cancel() 
      exc = e 
    if exc: 
     raise exc 


pool = pool_map(f, range(1, 20), 3) 

with closing(asyncio.get_event_loop()) as loop: 
    loop.run_until_complete(run_all(pool)) 
1

這裏有一個天真的解決方案,基於這樣的事實,cancel是,如果任務已經完成了無操作:

async def run_all(awaitables): 
    futures = [asyncio.ensure_future(a) for a in awaitables] 
    try: 
     for fut in as_completed(futures): 
      result = await fut 
      print('got result', result) 
    except: 
     for future in futures: 
      future.cancel() 
     await asyncio.wait(futures)