2015-04-05 37 views
3

我想利用Python的新asyncio庫發送異步HTTP請求。我希望在發送每個請求之前等待幾個毫秒(timeout變量) - 但是當然 - 將它們全部異步發送,而不是在發送每個請求之後等待響應。Python的asyncio同步工作

我做類似如下:

@asyncio.coroutine 
def handle_line(self, line, destination): 
    print("Inside! line {} destination {}".format(line, destination)) 
    response = yield from aiohttp.request('POST', destination, data=line, 
           headers=tester.headers) 
    print(response.status) 
    return (yield from response.read()) 

@asyncio.coroutine 
def send_data(self, filename, timeout): 
    destination='foo' 
    logging.log(logging.DEBUG, 'sending_data') 
    with open(filename) as log_file: 
     for line in log_file: 
      try: 
       json_event = json.loads(line) 
      except ValueError as e: 
       print("Error parsing json event") 
      time.sleep(timeout) 
      yield from asyncio.async(self.handle_line(json.dumps(json_event), destination)) 


loop=asyncio.get_event_loop().run_until_complete(send_data('foo.txt', 1)) 

,我獲得(通過打印200級響應)是這樣的代碼是同步運行的輸出。我究竟做錯了什麼?

回答

5

有幾個問題在這裏:

  1. 你應該使用asyncio.sleep,不time.sleep,因爲後者將阻止事件循環。

  2. 你不應該使用yield fromasyncio.async(self.handle_line(...))電話後,因爲那樣會使腳本塊,直到self.handle_line協程完成,這意味着你最終不會同時做任何事情;你處理每一行,等待處理完成,然後繼續下一行。相反,您應該在不等待的情況下運行所有​​asyncio.async調用,將Task對象返回到列表中,然後使用asyncio.wait等待它們全部完成後,全部啓動它們。

把所有在一起:

@asyncio.coroutine 
def handle_line(self, line, destination): 
    print("Inside! line {} destination {}".format(line, destination)) 
    response = yield from aiohttp.request('POST', destination, data=line, 
           headers=tester.headers) 
    print(response.status) 
    return (yield from response.read()) 

@asyncio.coroutine 
def send_data(self, filename, timeout): 
    destination='foo' 
    logging.log(logging.DEBUG, 'sending_data') 
    tasks = [] 
    with open(filename) as log_file: 
     for line in log_file: 
      try: 
       json_event = json.loads(line) 
      except ValueError as e: 
       print("Error parsing json event") 
      yield from asyncio.sleep(timeout) 
      tasks.append(asyncio.async(
       self.handle_line(json.dumps(json_event), destination)) 
    yield from asyncio.wait(tasks) 


asyncio.get_event_loop().run_until_complete(send_data('foo.txt', 1)) 
+0

嗯...... 謝謝。 但我不確定這正是我想要的。 asyncio.sleep的收益將在將任務附加到任務列表之前等待超時。 asyncio.wait的收益率將同時發送所有請求。 我希望它們能夠在每個請求之間的超時間隔之後一個接一個地發送。該代碼不會在每個請求之間等待。它會一次發送它們。 – OhadBasan 2015-04-06 14:26:46

+1

@OhadBasan asyncio.sleep(timeout)行的'yield將使得代碼在啓動每個任務之間等待'timeout'秒,而不僅僅是在將它附加到任務列表之間。這個任務實際上被添加到事件循環中,並且在您調用'asyncio.async(task())'時立即啓動。你不需要從任務中「放棄」它開始執行。 – dano 2015-04-06 14:29:21

+2

@OhadBasan它可以幫助我們認識到從'產出',而不是啓動一個協程,而是等待它的完成。 – 2015-06-03 00:30:37