1
我的功能run_tasks(all_tasks, window_size)
,它利用asyncio
任務的生成,並返回他們的價值,同時:ASYNCIO而維持秩序和處理錯誤運行多個任務
- 運行從
all_tasks
每個窗口(大小window_size
)同時 - 保留返回結果的順序(
all_tasks[i]
結果爲results[i]
) - 處理每個r的異常聯合國
我目前的執行情況:
import asyncio
from itertools import islice
# run all tasks and return their results in the same order
# window is the max number of tasks that will run in parallel
def run_tasks(all_tasks, window_size=4):
loop = asyncio.get_event_loop()
while True:
window_tasks = list(islice(all_tasks, window_size))
if not window_tasks:
break
futures = asyncio.wait(window_tasks, loop=loop)
finished, unfinished = loop.run_until_complete(futures)
# sort finished tasks by their launch order.
# removing this line makes returned tasks unordered
finished = sorted(finished, key=lambda f: window_tasks.index(f._coro))
for finished_task in finished:
try:
yield finished_task.result()
except Exception as e:
yield repr(e)
# Example Usage:
# a coroutine that sometime raises exception
async def sleepy(i):
print(f'{i} started')
await asyncio.sleep(10 - i)
print(f'{i} finished')
if i == 5:
raise ValueError('5 is the worst')
return i
# a generator of tasks
all_tasks = (sleepy(i) for i in range(10))
for result in list(run_tasks(all_tasks)):
print(result)
的問題
我的執行的問題是,我不能找到一種方法,而無需訪問f._coro
這是asyncio.Task
內部屬性的任務進行排序目的。
# removing this line makes returned tasks unordered
finished = sorted(finished, key=lambda f: window_tasks.index(f._coro))
我可以使用asyncio.gather(*tasks)
但這不會處理錯誤。
對於如何實現這三個屬性run_tasks()
而無需訪問f._coro
的建議,我很樂意提供。
,謝謝,我確實注意到了'return_exceptions' PARAM,但你不能區分由它返回例外 – ShmulikA
'但你不能區分它返回的異常 - 你可以。只需嘗試我發佈的代碼。我們需要'return_exceptions'只是不會通過'asyncio.gather'引發異常。但是我們不檢查'收集'的結果,我們檢查我們創建的'期貨'的結果。他們將在'fut.result()'調用中返回正常的協程結果,並在協程中引發異常時引發異常。 –
是的它是正確的,我測試它,它的工作原理。我剛剛注意到,我嘗試使用return_exceptions並嘗試查看結果,但是如您所記下的,當返回值爲類型異常時可能會導致問題。關鍵是看未來,而不是'loop.run_until_complete'的結果 – ShmulikA