我有一個運行過程,應該永遠運行asyncio
。通過超時取消異步迭代器
我可以使用ProcessIterator與該進程進行交互,ProcessIterator可以(在此處省略)將數據發送到stdin並從stdout中獲取數據。
我可以通過async for fd, data in ProcessIterator(...):
訪問數據。
現在的問題是,這個異步迭代器的執行必須是有時間限制的。如果時間用完,timeout()
函數被調用, 但異常不是源於__anext__
函數來通知超時。
如何在異步迭代器中引發異常? 我發現沒有辦法撥打awaitable.throw(something)
或類似的。
class ProcessIterator:
def __init__(self, process, loop, run_timeout):
self.process = process
self.loop = loop
self.run_timeout = run_timeout
# set the global timer
self.overall_timer = self.loop.call_later(
self.run_timeout, self.timeout)
def timeout(self):
# XXX: how do i pass this exception into the iterator?
raise ProcTimeoutError(
self.process.args,
self.run_timeout,
was_global,
)
async def __aiter__(self):
return self
async def __anext__(self):
if self.process.exited:
raise StopAsyncIteration()
else:
# fetch output from the process asyncio.Queue()
entry = await self.process.output_queue.get()
if entry == StopIteration:
raise StopAsyncIteration()
return entry
異步迭代器的使用,現在大致是:
async def test_coro(loop):
code = 'print("rofl"); time.sleep(5); print("lol")'
proc = Process([sys.executable, '-u', '-c', code])
await proc.create()
try:
async for fd, line in ProcessIterator(proc, loop, run_timeout=1):
print("%d: %s" % (fd, line))
except ProcessTimeoutError as exc:
# XXX This is the exception I'd like to get here! How can i throw it?
print("timeout: %s" % exc)
await proc.wait()
TL;博士:我如何可以拋出一個異常定時,因此從異步迭代器起源?
解決方案1不起作用,因爲潛在的隊列不會產生輸出,我們將永遠掛在'queue.get()'中。 解決方案2不起作用,因爲隊列可能會傳播近乎無限量的消息,這會阻止入隊的StopIteration或花費很長時間,直到它成爲下一個元素。 超時必須具有最高優先級(但使用優先隊列似乎是錯誤的)才能夠可靠地終止進程,因爲它是不可信代碼。 – TheJJ
一個更好的方法可能正在等待未來和隊列,並在其中一個準備就緒時繼續,如果兩者都一樣,那麼未來是首選。當異常被立即設置爲結果時,它可以做出反應。我想我有一個想法,讓我們看看。 – TheJJ