2016-09-26 13 views
1

我是asyncioaiohttp新手。我目前得到這個錯誤,不知道爲什麼我收到InvalidStateErrorasyncio未來RuntimeError爲我的會議:帶有asyncio futures和RuntimeError的InvalidStateError與aiohttp時使用期貨回調

Traceback (most recent call last): 
    File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/runpy.py", line 184, in _run_module_as_main 
    "__main__", mod_spec) 
    File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/runpy.py", line 85, in _run_code 
    exec(code, run_globals) 
    File "/Users/bli1/Development/QE/idea/trinity-tracer/tracer/tests/tracer.py", line 100, in <module> 
    sys.exit(main(sys.argv)) 
    File "/Users/bli1/Development/QE/idea/trinity-tracer/tracer/tests/tracer.py", line 92, in main 
    poster.post() 
    File "/Users/bli1/Development/QE/idea/trinity-tracer/tracer/utils/poster.py", line 87, in post 
    results = event_loop.run_until_complete(self.async_post_events(events)) 
    File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete 
    return future.result() 
    File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 274, in result 
    raise self._exception 
    File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py", line 239, in _step 
    result = coro.send(None) 
    File "/Users/bli1/Development/QE/idea/trinity-tracer/tracer/utils/poster.py", line 79, in async_post_events 
    task.add_done_callback(self.send_oracle, task.result(), session) 
    File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 268, in result 
    raise InvalidStateError('Result is not ready.') 
asyncio.futures.InvalidStateError: Result is not ready. 
Task exception was never retrieved 
future: <Task finished coro=<Poster.async_post_event() done, defined at /Users/bli1/Development/QE/idea/trinity-tracer/tracer/utils/poster.py:62> exception=RuntimeError('Session is closed',)> 
Traceback (most recent call last): 
    File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py", line 239, in _step 
    result = coro.send(None) 
    File "/Users/bli1/Development/QE/idea/trinity-tracer/tracer/utils/poster.py", line 64, in async_post_event 
    async with session.post(self.endpoint, data=event) as resp: 
    File "/Users/bli1/Development/QE/idea/trinity-tracer/lib/python3.5/site-packages/aiohttp/client.py", line 565, in __aenter__ 
    self._resp = yield from self._coro 
    File "/Users/bli1/Development/QE/idea/trinity-tracer/lib/python3.5/site-packages/aiohttp/client.py", line 161, in _request 
    raise RuntimeError('Session is closed') 
RuntimeError: Session is closed 

我所試圖做的是POST到端點,然後使用相同的事件POST編輯發佈到另一個端點。這將是跑了作爲另一async方法作爲callback

這裏是我的代碼:

async def async_post_event(self, event, session): 
     async with session.post(self.endpoint, data=event) as resp: 
      event["tracer"]["post"]["timestamp"] = time.time() * 1000.0 
      event["tracer"]["post"]["statusCode"] = await resp.status 
      return event 

    async def send_oracle(self, event, session): 
     async with session.post(self.oracle, data=event) as resp: 
      return event["event"]["event_header"]["event_id"], await resp.status 

    async def async_post_events(self, events): 
     tasks = [] 
     conn = aiohttp.TCPConnector(verify_ssl=self.secure) 
     async with aiohttp.ClientSession(connector=conn) as session: 
      for event in events: 
       task = asyncio.ensure_future(self.async_post_event(event, session)) 
       task.add_done_callback(self.send_oracle, task.result(), session) 
       tasks.append(task) 
     return await asyncio.gather(*tasks) 

    def post(self): 
     event_loop = asyncio.get_event_loop() 
     try: 
      events = [self.gen_random_event() for i in range(self.num_post)] 
      results = event_loop.run_until_complete(self.async_post_events(events)) 
      print(results) 
     finally: 
      event_loop.close() 

回答

2

add_done_callback接受回調,不是協程。 此外,它是非常低級別的API的一部分,應由臨時開發人員避免。

但是你的主要錯誤是在ClientSession異步上下文管理器之外調用session.post(),堆棧跟蹤明確地指向它。

我修改您的代碼段爲得到的東西看起來像一個工作代碼:

async def async_post_event(self, event, session): 
    async with session.post(self.endpoint, data=event) as resp: 
     event["tracer"]["post"]["timestamp"] = time.time() * 1000.0 
     event["tracer"]["post"]["statusCode"] = await resp.status 
    async with session.post(self.oracle, data=event) as resp: 
     return event["event"]["event_header"]["event_id"], await resp.status 

async def async_post_events(self, events): 
    coros = [] 
    conn = aiohttp.TCPConnector(verify_ssl=self.secure) 
    async with aiohttp.ClientSession(connector=conn) as session: 
     for event in events: 
      coros.append(self.async_post_event(event, session)) 
     return await asyncio.gather(*coros) 

def post(self): 
    event_loop = asyncio.get_event_loop() 
    try: 
     events = [self.gen_random_event() for i in range(self.num_post)] 
     results = event_loop.run_until_complete(self.async_post_events(events)) 
     print(results) 
    finally: 
     event_loop.close() 

你可以從async_post_event提取2個職位成單獨的協同程序,但主要的想法是一樣的。

+0

謝謝你的幫助!得到它的工作! = d – Liondancer