2017-09-02 78 views
1

我的功能run_tasks(all_tasks, window_size),它利用asyncio任務的生成,並返回他們的價值,同時:ASYNCIO而維持秩序和處理錯誤運行多個任務

  1. 運行從all_tasks每個窗口(大小window_size)同時
  2. 保留返回結果的順序(all_tasks[i]結果爲results[i]
  3. 處理每個r的異常聯合國

我目前的執行情況:

import asyncio 
from itertools import islice 


# run all tasks and return their results in the same order 
# window is the max number of tasks that will run in parallel 
def run_tasks(all_tasks, window_size=4): 
    loop = asyncio.get_event_loop() 

    while True: 
     window_tasks = list(islice(all_tasks, window_size)) 
     if not window_tasks: 
      break 

     futures = asyncio.wait(window_tasks, loop=loop) 
     finished, unfinished = loop.run_until_complete(futures) 

     # sort finished tasks by their launch order. 
     # removing this line makes returned tasks unordered 
     finished = sorted(finished, key=lambda f: window_tasks.index(f._coro)) 

     for finished_task in finished: 
      try: 
       yield finished_task.result() 
      except Exception as e: 
       yield repr(e) 

# Example Usage: 

# a coroutine that sometime raises exception 
async def sleepy(i): 
    print(f'{i} started') 
    await asyncio.sleep(10 - i) 
    print(f'{i} finished') 
    if i == 5: 
     raise ValueError('5 is the worst') 
    return i 

# a generator of tasks 
all_tasks = (sleepy(i) for i in range(10)) 

for result in list(run_tasks(all_tasks)): 
    print(result) 

的問題

我的執行的問題是,我不能找到一種方法,而無需訪問f._coro這是asyncio.Task內部屬性的任務進行排序目的。

# removing this line makes returned tasks unordered 
finished = sorted(finished, key=lambda f: window_tasks.index(f._coro)) 

我可以使用asyncio.gather(*tasks)但這不會處理錯誤。

對於如何實現這三個屬性run_tasks()而無需訪問f._coro的建議,我很樂意提供。

回答

1

asyncio.gathercan如果您指定它的關鍵字參數return_exceptions,將返回錯誤。爲了區分真正的異常從異常對象返回的協同程序的結果,你可以使用ensure_future包裝你window_tasks與任務:

futures = [asyncio.ensure_future(t, loop=loop) for t in window_tasks] 
gathered = asyncio.gather(*futures, loop=loop, return_exceptions=True) 
loop.run_until_complete(gathered) 

for fut in futures: 
    try: 
     yield fut.result() 
    except Exception as e: 
     yield repr(e) 
+0

,謝謝,我確實注意到了'return_exceptions' PARAM,但你不能區分由它返回例外 – ShmulikA

+0

'但你不能區分它返回的異常 - 你可以。只需嘗試我發佈的代碼。我們需要'return_exceptions'只是不會通過'asyncio.gather'引發異常。但是我們不檢查'收集'的結果,我們檢查我們創建的'期貨'的結果。他們將在'fut.result()'調用中返回正常的協程結果,並在協程中引發異常時引發異常。 –

+0

是的它是正確的,我測試它,它的工作原理。我剛剛注意到,我嘗試使用return_exceptions並嘗試查看結果,但是如您所記下的,當返回值爲類型異常時可能會導致問題。關鍵是看未來,而不是'loop.run_until_complete'的結果 – ShmulikA