2016-03-08 58 views
3

我有一個運行過程,應該永遠運行asyncio通過超時取消異步迭代器

我可以使用ProcessIterator與該進程進行交互,ProcessIterator可以(在此處省略)將數據發送到stdin並從stdout中獲取數據。

我可以通過async for fd, data in ProcessIterator(...):訪問數據。

現在的問題是,這個異步迭代器的執行必須是有時間限制的。如果時間用完,timeout()函數被調用, 但異常不是源於__anext__函數來通知超時。

如何在異步迭代器中引發異常? 我發現沒有辦法撥打awaitable.throw(something)或類似的。

class ProcessIterator: 
    def __init__(self, process, loop, run_timeout): 
     self.process = process 
     self.loop = loop 

     self.run_timeout = run_timeout 

     # set the global timer 
     self.overall_timer = self.loop.call_later(
      self.run_timeout, self.timeout) 

    def timeout(self): 
     # XXX: how do i pass this exception into the iterator? 
     raise ProcTimeoutError(
      self.process.args, 
      self.run_timeout, 
      was_global, 
     ) 

    async def __aiter__(self): 
     return self 

    async def __anext__(self):  
     if self.process.exited: 
      raise StopAsyncIteration() 

     else: 
      # fetch output from the process asyncio.Queue() 
      entry = await self.process.output_queue.get() 
      if entry == StopIteration: 
       raise StopAsyncIteration() 

      return entry 

異步迭代器的使用,現在大致是:

async def test_coro(loop): 
    code = 'print("rofl"); time.sleep(5); print("lol")' 

    proc = Process([sys.executable, '-u', '-c', code]) 

    await proc.create() 

    try: 
     async for fd, line in ProcessIterator(proc, loop, run_timeout=1): 
      print("%d: %s" % (fd, line)) 

    except ProcessTimeoutError as exc: 
     # XXX This is the exception I'd like to get here! How can i throw it? 
     print("timeout: %s" % exc) 

    await proc.wait() 

TL;博士:我如何可以拋出一個異常定時,因此從異步迭代器起源?

回答

0

這是我現在提出的解決方案。

參見https://github.com/SFTtech/kevinkevin/process.py爲上游版本。

它還具有行計數和輸出超時,我從這個例子中剝離。

class Process: 
    def __init__(self, command, loop=None): 

     self.loop = loop or asyncio.get_event_loop() 

     self.created = False 
     self.killed = asyncio.Future() 

     self.proc = self.loop.subprocess_exec(
      lambda: WorkerInteraction(self), # see upstream repo 
      *command) 

     self.transport = None 
     self.protocol = None 

    async def create(self): 
     self.transport, self.protocol = await self.proc 

    def communicate(self, timeout): 
     if self.killed.done(): 
      raise Exception("process was already killed " 
          "and no output is waiting") 

     return ProcessIterator(self, self.loop, timeout) 

class ProcessIterator: 
    """ 
    Asynchronous iterator for the process output. 
    Use like `async for (fd, data) in ProcessIterator(...):` 
    """ 

    def __init__(self, process, loop, run_timeout): 
     self.process = process 
     self.loop = loop 
     self.run_timeout = run_timeout 

     self.overall_timer = None 

     if self.run_timeout < INF: 
      # set the global timer 
      self.overall_timer = self.loop.call_later(
       self.run_timeout, 
       functools.partial(self.timeout, was_global=True)) 

    def timeout(self): 
     if not self.process.killed.done(): 
      self.process.killed.set_exception(ProcTimeoutError(
       self.process.args, 
       self.run_timeout, 
      )) 

    async def __aiter__(self): 
     return self 

    async def __anext__(self): 
     # either the process exits, 
     # there's an exception (process killed, timeout, ...) 
     # or the queue gives us the next data item. 
     # wait for the first of those events. 
     done, pending = await asyncio.wait(
      [self.process.protocol.queue.get(), self.process.killed], 
      return_when=asyncio.FIRST_COMPLETED) 

     # at least one of them is done now: 
     for future in done: 
      # if something failed, cancel the pending futures 
      # and raise the exception 
      # this happens e.g. for a timeout. 
      if future.exception(): 
       for future_pending in pending: 
        future_pending.cancel() 

       # kill the process before throwing the error! 
       await self.process.pwn() 
       raise future.exception() 

      # fetch output from the process 
      entry = future.result() 

      # it can be stopiteration to indicate the last data chunk 
      # as the process exited on its own. 
      if entry == StopIteration: 
       if not self.process.killed.done(): 
        self.process.killed.set_result(entry) 

        # raise the stop iteration 
        await self.stop_iter(enough=False) 

      return entry 

     raise Exception("internal fail: no future was done!") 

    async def stop_iter(self): 
     # stop the timer 
     if self.overall_timer: 
      self.overall_timer.cancel() 

     retcode = self.process.returncode() 

     raise StopAsyncIteration() 

神奇功能是這樣的:

done, pending = await asyncio.wait(
    [self.process.protocol.queue.get(), self.process.killed], 
    return_when=asyncio.FIRST_COMPLETED) 

當超時發生時,隊列取出被可靠地中止。

1

編輯:加入溶液2

解決方案1:

能否timeout()回調店ProcTimeoutError例外在一個實例變量?然後__anext__()可以檢查實例變量並在設置時引發異常。

class ProcessIterator: 
    def __init__(self, process, loop, run_timeout): 
     self.process = process 
     self.loop = loop 
     self.error = None 

     self.run_timeout = run_timeout 

     # set the global timer 
     self.overall_timer = self.loop.call_later(
      self.run_timeout, self.timeout) 

    def timeout(self): 
     # XXX: set instance variable 
     self.error = ProcTimeoutError(
         self.process.args, 
         self.run_timeout, 
         was_global 
        ) 

    async def __aiter__(self): 
     return self 

    async def __anext__(self): 
     # XXX: if error is set, then raise the exception 
     if self.error: 
      raise self.error 

     elif self.process.exited: 
      raise StopAsyncIteration() 

     else: 
      # fetch output from the process asyncio.Queue() 
      entry = await self.process.output_queue.get() 
      if entry == StopIteration: 
       raise StopAsyncIteration() 

      return entry 

解決方案2:

把例外的process.output_queue。

.... 
def timeout(self): 
    # XXX: set instance variable 
    self.process.ouput_queue.put(ProcTimeoutError(
            self.process.args, 
            self.run_timeout, 
            was_global 
           )) 

.... 

# fetch output from the process asyncio.Queue() 
entry = await self.process.output_queue.get() 
if entry == StopIteration: 
    raise StopAsyncIteration() 

elif entry = ProcTimeoutError: 
    raise entry 
.... 

如果隊列中可能有條目,請使用優先級隊列。分配ProcTimeoutError的優先級高於其他條目,例如(0,ProcTimeoutError)vs(1,other_entry)。

+0

解決方案1不起作用,因爲潛在的隊列不會產生輸出,我們將永遠掛在'queue.get()'中。 解決方案2不起作用,因爲隊列可能會傳播近乎無限量的消息,這會阻止入隊的StopIteration或花費很長時間,直到它成爲下一個元素。 超時必須具有最高優先級(但使用優先隊列似乎是錯誤的)才能夠可靠地終止進程,因爲它是不可信代碼。 – TheJJ

+0

一個更好的方法可能正在等待未來和隊列,並在其中一個準備就緒時繼續,如果兩者都一樣,那麼未來是首選。當異常被立即設置爲結果時,它可以做出反應。我想我有一個想法,讓我們看看。 – TheJJ

0

您可以使用get_nowait,它會立即返回條目或丟棄QueueEmpty。在while環路包裝它與一些異步睡眠應該做的伎倆。喜歡的東西:

async def __anext__(self):  
    if self.process.exited: 
     raise StopAsyncIteration() 

    else: 
     while self.error is None: 
      try: 
       entry = self.process.output_queue.get_nowait() 
       if entry == StopIteration: 
        raise StopAsyncIteration() 
       return entry 
      except asyncio.QueueEmpty: 
       # some sleep to give back control to ioloop 
       # since we using nowait 
       await asyncio.sleep(0.1) 
     else: 
      raise self.error 

而且作爲在Tornado's Queue.get實現使用超時的提示方式:

def get(self, timeout=None): 
    """Remove and return an item from the queue. 
    Returns a Future which resolves once an item is available, or raises 
    `tornado.gen.TimeoutError` after a timeout. 
    """ 
    future = Future() 
    try: 
     future.set_result(self.get_nowait()) 
    except QueueEmpty: 
     self._getters.append(future) 
     _set_timeout(future, timeout) 
    return future 
+0

我不認爲這是一個可以接受的方式來解決這個與一些「忙」等待這樣。當循環再次處理時,應該拋出異常,而不是一次又一次嘗試。 – TheJJ

+0

你可能不同意,但這就像'asyncio.Queue.get'工程 - 忙等待https://github.com/python/asyncio/blob/master/asyncio/queues.py#L157 – kwarunek

+0

當然,它可以寫得更好,作爲我已經添加了Tornado實現的提示 – kwarunek

1

請從asyncio檢查timeout情況管理器:

with asyncio.timeout(10): 
    async for i in get_iter(): 
     process(i) 

這是不但你可以複製粘貼從asyncio master branch

+0

看起來很有希望。如果我現在可以通過在get_iter(timeout = 10)中指定並設置的超時執行此操作,那麼它就是我想要的:) – TheJJ