2017-12-27 891 views
2

我與中信高科工作後,但我有點卡住了。我打電話給3個不同的API,每個都有自己的響應時間。打印響應部分完成Python的異步事件循環,同時還完成任務的響應

我想創建一個超時功能,它爲每個任務返回一個可接受的時間。但是如果時間任務在可接受的時間內沒有完成,我想返回部分數據,因爲我不需要一個完整的數據集,速度更關注。

不過,我想保持未完成的任務工作,直到完成(即請求API數據插入到一個Postgres數據庫。

我想知道,如果我們能做到這一點,而無需使用某種調度到保持背景中運行的任務。

+0

你熟悉的聽衆/觀察者模式?它經常用於異步回調,可以在任何時候返回 –

+0

不,但是你有任何鏈接嗎? –

+0

谷歌搜索'Python觀察者模式'出現了這個:http://www.giantflyingsaucer.com/blog/?p=5117。否則,只需要尋找幫助你的教程 –

回答

1

但如果時間任務不是可接受的時間內完成,我想 返回部分數據,因爲我並不需要一個完整的數據集和速度 更多的是關注的焦點。

但是,我想保持未完成的任務,直到完成

所以其他任務是獨立於超時任務的狀態,對嗎?如果我正確地理解了你,你只想用他們自己的超時運行3 asyncio.Task,並在最後彙總他們的結果。

唯一可能的問題,我看到的是「想返回部分數據」,因爲它很可能的事情如何組織有所不同,但我們可能只需要通過這個「部分數據」裏面的任務上調超時被取消例外。

這裏的小原型:

import asyncio 


class PartialData(Exception): 
    def __init__(self, data): 
     super().__init__() 
     self.data = data   


async def api_job(i): 
    data = 'job {i}:'.format(i=i) 
    try: 
     await asyncio.sleep(1) 
     data += ' step 1,' 
     await asyncio.sleep(2) 
     data += ' step 2,' 
     await asyncio.sleep(2) 
     data += ' step 3.' 
    except asyncio.CancelledError as exc: 
     raise PartialData(data) # Pass partial data to outer code with our exception. 
    else: 
     return data 


async def api_task(i, timeout): 
    """Wrapper for api_job to run it with timeout and retrieve it's partial data on timeout.""" 
    t = asyncio.ensure_future(api_job(i)) 
    try: 
     await asyncio.wait_for(t, timeout) 
    except asyncio.TimeoutError: 
     try: 
      await t 
     except PartialData as exc: 
      return exc.data # retrieve partial data on timeout and return it. 
    else: 
     return t.result() 


async def main(): 
    # Run 3 jobs with different timeouts: 
    results = await asyncio.gather(
     api_task(1, timeout=2), 
     api_task(2, timeout=4), 
     api_task(3, timeout=6), 
    ) 

    # Print results including "partial data": 
    for res in results: 
     print(res) 


if __name__ == '__main__': 
    loop = asyncio.get_event_loop() 
    try: 
     loop.run_until_complete(main()) 
    finally: 
     loop.run_until_complete(loop.shutdown_asyncgens()) 
     loop.close() 

輸出:

job 1: step 1, 
job 2: step 1, step 2, 
job 3: step 1, step 2, step 3. 

(你可以看到前兩個作業完成了超時和檢索他們的DATAS中的一部分)

UPD:

複雜的例子包含可能的解決方案不同的事件:

import asyncio 
from contextlib import suppress 


async def stock1(_): 
    await asyncio.sleep(1) 
    return 'stock1 res' 

async def stock2(exception_in_2): 
    await asyncio.sleep(1) 
    if exception_in_2: 
     raise ValueError('Exception in stock2!') 
    await asyncio.sleep(1) 
    return 'stock2 res' 

async def stock3(_): 
    await asyncio.sleep(3) 
    return 'stock3 res' 


async def main(): 
    # Vary this values to see different situations: 
    timeout = 2.5 
    exception_in_2 = False 


    # To run all three stocks just create tasks for them: 
    tasks = [ 
     asyncio.ensure_future(s(exception_in_2)) 
     for s 
     in (stock1, stock2, stock3) 
    ] 


    # Now we just wait until one of this possible situations happened: 
    # 1) Everything done 
    # 2) Exception occured in one of tasks 
    # 3) Timeout occured and at least two tasks ready 
    # 4) Timeout occured and less than two tasks ready 
    # (https://docs.python.org/3/library/asyncio-task.html#asyncio.wait) 
    await asyncio.wait(
     tasks, 
     timeout=timeout, 
     return_when=asyncio.FIRST_EXCEPTION 
    ) 

    is_success = all(t.done() and not t.exception() for t in tasks) 
    is_exception = any(t.done() and t.exception() for t in tasks) 
    is_good_timeout = \ 
     not is_success and \ 
     not is_exception and \ 
     sum(t.done() for t in tasks) >= 2 
    is_bad_timeout = \ 
     not is_success and \ 
     not is_exception and \ 
     sum(t.done() for t in tasks) < 2 


    # If success, just print all results: 
    if is_success: 
     print('All done before timeout:') 
     for t in tasks: 
      print(t.result()) 
    # If timeout, but at least two done, 
    # print it leaving pending task to be executing. 
    # But note two important things: 
    # 1) You should guarantee pending task done before loop closed 
    # 2) What if pending task will finish with error, is it ok? 
    elif is_good_timeout: 
     print('Timeout, but enought tasks done:') 
     for t in tasks: 
      if t.done(): 
       print(t.result()) 
    # Timeout and not enought tasks done, 
    # let's just cancel all hanging:  
    elif is_bad_timeout: 
     await cancel_and_retrieve(tasks) 
     raise RuntimeError('Timeout and not enought tasks done') # You probably want indicate fail 
    # If any of tasks is finished with an exception, 
    # we should probably cancel unfinished tasks, 
    # await all tasks done and retrive all exceptions to prevent warnings 
    # (https://docs.python.org/3/library/asyncio-dev.html#detect-exceptions-never-consumed) 
    elif is_exception: 
     await cancel_and_retrieve(tasks) 
     raise RuntimeError('Exception in one of tasks') # You probably want indicate fail 


async def cancel_and_retrieve(tasks): 
    """ 
    Cancel all pending tasks, retrieve all exceptions 
    (https://docs.python.org/3/library/asyncio-dev.html#detect-exceptions-never-consumed) 
    It's cleanup function if we don't want task being continued. 
    """ 
    for t in tasks: 
     if not t.done(): 
      t.cancel() 
    await asyncio.wait(
     tasks, 
     return_when=asyncio.ALL_COMPLETED 
    ) 
    for t in tasks: 
     with suppress(Exception): 
      await t 


if __name__ == '__main__': 
    loop = asyncio.get_event_loop() 
    try: 
     loop.run_until_complete(main()) 
    finally: 

     # If some tasks still pending (is_good_timeout case), 
     # let's kill them: 
     loop.run_until_complete(
      cancel_and_retrieve(asyncio.Task.all_tasks()) 
     ) 

     loop.run_until_complete(loop.shutdown_asyncgens()) 
     loop.close() 
+0

讓我們用這個邏輯一秒鐘,讓我們說你正在顯示股票。有3只股票。你要求股票的價格,但股票2需要更長的時間。而不是等待,我們只顯示股票價格1和3. –

+0

@StandardCitizen如果股票2在1或3之前到達,我們做什麼? –

+0

感謝您的回覆,讓我們在允許的時間內說明2次到達並輸入數據庫。第三場比賽時間太長。因此,我們將返回請求,但第三個進程將保持打開狀態並執行該任務直至完成。希望在刷新用戶之後,它就會完成。可以說這些股票每天只開一次。 –