我有一個pool_map
函數可以用來限制同時執行的函數的數量。如何使asyncio池可取消?
的想法是具有coroutine function接受被映射到的可能的參數的列表的單個參數,而且還包所有函數調用成旗語取得,於是只有有限數量的運行一次:
from typing import Callable, Awaitable, Iterable, Iterator
from asyncio import Semaphore
A = TypeVar('A')
V = TypeVar('V')
async def pool_map(
func: Callable[[A], Awaitable[V]],
arg_it: Iterable[A],
size: int=10
) -> Generator[Awaitable[V], None, None]:
"""
Maps an async function to iterables
ensuring that only some are executed at once.
"""
semaphore = Semaphore(size)
async def sub(arg):
async with semaphore:
return await func(arg)
return map(sub, arg_it)
爲了舉例,我修改了上面的代碼並沒有對它進行測試,但是我的變體運行良好。例如。你可以這樣使用它:
from asyncio import get_event_loop, coroutine, as_completed
from contextlib import closing
URLS = [...]
async def run_all(awaitables):
for a in as_completed(awaitables):
result = await a
print('got result', result)
async def download(url): ...
if __name__ != '__main__':
pool = pool_map(download, URLS)
with closing(get_event_loop()) as loop:
loop.run_until_complete(run_all(pool))
但是如果在等待未來時拋出異常,就會出現問題。我無法看到如何取消所有計劃或仍在運行的任務,也沒有人正在等待信號量的獲取。
有沒有一個圖書館或這個我不知道的優雅建築模塊,還是我必須自己構建所有零件? (即Semaphore
可以訪問它的服務生,一個as_finished
提供其運行的任務隊列的訪問,...)