我與中信高科工作後,但我有點卡住了。我打電話給3個不同的API,每個都有自己的響應時間。打印響應部分完成Python的異步事件循環,同時還完成任務的響應
我想創建一個超時功能,它爲每個任務返回一個可接受的時間。但是如果時間任務在可接受的時間內沒有完成,我想返回部分數據,因爲我不需要一個完整的數據集,速度更關注。
不過,我想保持未完成的任務工作,直到完成(即請求API數據插入到一個Postgres數據庫。
我想知道,如果我們能做到這一點,而無需使用某種調度到保持背景中運行的任務。
我與中信高科工作後,但我有點卡住了。我打電話給3個不同的API,每個都有自己的響應時間。打印響應部分完成Python的異步事件循環,同時還完成任務的響應
我想創建一個超時功能,它爲每個任務返回一個可接受的時間。但是如果時間任務在可接受的時間內沒有完成,我想返回部分數據,因爲我不需要一個完整的數據集,速度更關注。
不過,我想保持未完成的任務工作,直到完成(即請求API數據插入到一個Postgres數據庫。
我想知道,如果我們能做到這一點,而無需使用某種調度到保持背景中運行的任務。
但如果時間任務不是可接受的時間內完成,我想 返回部分數據,因爲我並不需要一個完整的數據集和速度 更多的是關注的焦點。
但是,我想保持未完成的任務,直到完成
所以其他任務是獨立於超時任務的狀態,對嗎?如果我正確地理解了你,你只想用他們自己的超時運行3 asyncio.Task
,並在最後彙總他們的結果。
唯一可能的問題,我看到的是「想返回部分數據」,因爲它很可能的事情如何組織有所不同,但我們可能只需要通過這個「部分數據」裏面的任務上調超時被取消例外。
這裏的小原型:
import asyncio
class PartialData(Exception):
def __init__(self, data):
super().__init__()
self.data = data
async def api_job(i):
data = 'job {i}:'.format(i=i)
try:
await asyncio.sleep(1)
data += ' step 1,'
await asyncio.sleep(2)
data += ' step 2,'
await asyncio.sleep(2)
data += ' step 3.'
except asyncio.CancelledError as exc:
raise PartialData(data) # Pass partial data to outer code with our exception.
else:
return data
async def api_task(i, timeout):
"""Wrapper for api_job to run it with timeout and retrieve it's partial data on timeout."""
t = asyncio.ensure_future(api_job(i))
try:
await asyncio.wait_for(t, timeout)
except asyncio.TimeoutError:
try:
await t
except PartialData as exc:
return exc.data # retrieve partial data on timeout and return it.
else:
return t.result()
async def main():
# Run 3 jobs with different timeouts:
results = await asyncio.gather(
api_task(1, timeout=2),
api_task(2, timeout=4),
api_task(3, timeout=6),
)
# Print results including "partial data":
for res in results:
print(res)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
輸出:
job 1: step 1,
job 2: step 1, step 2,
job 3: step 1, step 2, step 3.
(你可以看到前兩個作業完成了超時和檢索他們的DATAS中的一部分)
UPD:
複雜的例子包含可能的解決方案不同的事件:
import asyncio
from contextlib import suppress
async def stock1(_):
await asyncio.sleep(1)
return 'stock1 res'
async def stock2(exception_in_2):
await asyncio.sleep(1)
if exception_in_2:
raise ValueError('Exception in stock2!')
await asyncio.sleep(1)
return 'stock2 res'
async def stock3(_):
await asyncio.sleep(3)
return 'stock3 res'
async def main():
# Vary this values to see different situations:
timeout = 2.5
exception_in_2 = False
# To run all three stocks just create tasks for them:
tasks = [
asyncio.ensure_future(s(exception_in_2))
for s
in (stock1, stock2, stock3)
]
# Now we just wait until one of this possible situations happened:
# 1) Everything done
# 2) Exception occured in one of tasks
# 3) Timeout occured and at least two tasks ready
# 4) Timeout occured and less than two tasks ready
# (https://docs.python.org/3/library/asyncio-task.html#asyncio.wait)
await asyncio.wait(
tasks,
timeout=timeout,
return_when=asyncio.FIRST_EXCEPTION
)
is_success = all(t.done() and not t.exception() for t in tasks)
is_exception = any(t.done() and t.exception() for t in tasks)
is_good_timeout = \
not is_success and \
not is_exception and \
sum(t.done() for t in tasks) >= 2
is_bad_timeout = \
not is_success and \
not is_exception and \
sum(t.done() for t in tasks) < 2
# If success, just print all results:
if is_success:
print('All done before timeout:')
for t in tasks:
print(t.result())
# If timeout, but at least two done,
# print it leaving pending task to be executing.
# But note two important things:
# 1) You should guarantee pending task done before loop closed
# 2) What if pending task will finish with error, is it ok?
elif is_good_timeout:
print('Timeout, but enought tasks done:')
for t in tasks:
if t.done():
print(t.result())
# Timeout and not enought tasks done,
# let's just cancel all hanging:
elif is_bad_timeout:
await cancel_and_retrieve(tasks)
raise RuntimeError('Timeout and not enought tasks done') # You probably want indicate fail
# If any of tasks is finished with an exception,
# we should probably cancel unfinished tasks,
# await all tasks done and retrive all exceptions to prevent warnings
# (https://docs.python.org/3/library/asyncio-dev.html#detect-exceptions-never-consumed)
elif is_exception:
await cancel_and_retrieve(tasks)
raise RuntimeError('Exception in one of tasks') # You probably want indicate fail
async def cancel_and_retrieve(tasks):
"""
Cancel all pending tasks, retrieve all exceptions
(https://docs.python.org/3/library/asyncio-dev.html#detect-exceptions-never-consumed)
It's cleanup function if we don't want task being continued.
"""
for t in tasks:
if not t.done():
t.cancel()
await asyncio.wait(
tasks,
return_when=asyncio.ALL_COMPLETED
)
for t in tasks:
with suppress(Exception):
await t
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
# If some tasks still pending (is_good_timeout case),
# let's kill them:
loop.run_until_complete(
cancel_and_retrieve(asyncio.Task.all_tasks())
)
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
讓我們用這個邏輯一秒鐘,讓我們說你正在顯示股票。有3只股票。你要求股票的價格,但股票2需要更長的時間。而不是等待,我們只顯示股票價格1和3. –
@StandardCitizen如果股票2在1或3之前到達,我們做什麼? –
感謝您的回覆,讓我們在允許的時間內說明2次到達並輸入數據庫。第三場比賽時間太長。因此,我們將返回請求,但第三個進程將保持打開狀態並執行該任務直至完成。希望在刷新用戶之後,它就會完成。可以說這些股票每天只開一次。 –
你熟悉的聽衆/觀察者模式?它經常用於異步回調,可以在任何時候返回 –
不,但是你有任何鏈接嗎? –
谷歌搜索'Python觀察者模式'出現了這個:http://www.giantflyingsaucer.com/blog/?p=5117。否則,只需要尋找幫助你的教程 –