2016-09-08 98 views
2

我想用asyncio來處理併發的網絡I/O。大量的功能將被安排在一個單獨的點上,每個點完成所花費的時間差異很大。接收到的數據然後在每個輸出的獨立過程中處理。等待任何未來的asyncio

數據處理的順序是不相關的,所以考慮到輸出的潛在非常長的等待期,我希望await可以首先取代預定義順序的任何未來結束。

def fetch(x): 
    sleep() 

async def main(): 
    futures = [loop.run_in_executor(None, fetch, x) for x in range(50)] 
    for f in futures: 
     await f 

loop = asyncio.get_event_loop() 
loop.run_until_complete(main()) 

通常情況下,在期貨排隊順序等待是好的:

Well behaved functions profiler graph

藍色代表時間每個任務是執行程序的隊列,即run_in_executor被調用,但功能是尚未執行,因爲執行者只同時運行5個任務;綠色是花在執行功能本身上的時間;紅色是等待所有以前的期貨到await的時間。

Volatile functions profiler graph

在我的情況下功能的時間相差很大,有失去在排隊等待前面的期貨等待,而我可以在本地處理GET輸出了大量的時間。這使得我的系統在一段時間內閒置一段時間,只有當幾個輸出同時完成時纔會被淹沒,然後跳回到空閒狀態,等待更多請求完成。

是否有辦法通過await執行者首先完成的任何未來?

+0

你用什麼來形象化協程執行? :) – PovilasB

+0

@PovilasB大量記錄'time.time()'和PIL – Mirac7

+0

如果您使用的是期貨,我發現[as_completed](https://docs.python.org/3/library/concurrent.futures.html #concurrent.futures.as_completed)在處理事件完成時非常有幫助。 –

回答

3

看起來你好像正在尋找asyncio.waitreturn_when=asyncio.FIRST_COMPLETED

def fetch(x): 
    sleep() 

async def main(): 
    futures = [loop.run_in_executor(None, fetch, x) for x in range(50)] 
    while futures: 
     done, futures = await asyncio.wait(futures, 
      loop=loop, return_when=asyncio.FIRST_COMPLETED) 
     for f in done: 
      await f 

loop = asyncio.get_event_loop() 
loop.run_until_complete(main())