3

我想運行一個服務,使用協程和多線程來請求URL。但是我不能把協程去傳給執行者的工作人員。請參閱下面的代碼針對此問題的一個小例子:爲什麼協程不能與run_in_executor一起使用?

import time 
import asyncio 
import concurrent.futures 

EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=5) 

async def async_request(loop): 
    await asyncio.sleep(3) 

def sync_request(_): 
    time.sleep(3) 

async def main(loop): 
    futures = [loop.run_in_executor(EXECUTOR, async_request,loop) 
       for x in range(10)] 

    await asyncio.wait(futures) 

loop = asyncio.get_event_loop() 
loop.run_until_complete(main(loop)) 

下面的錯誤而導致:

Traceback (most recent call last): 
    File "co_test.py", line 17, in <module> 
    loop.run_until_complete(main(loop)) 
    File "/usr/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete 
    return future.result() 
    File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result 
    raise self._exception 
    File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step 
    result = coro.send(None) 
    File "co_test.py", line 10, in main 
    futures = [loop.run_in_executor(EXECUTOR, req,loop) for x in range(10)] 
    File "co_test.py", line 10, in <listcomp> 
    futures = [loop.run_in_executor(EXECUTOR, req,loop) for x in range(10)] 
    File "/usr/lib/python3.5/asyncio/base_events.py", line 541, in run_in_executor 
    raise TypeError("coroutines cannot be used with run_in_executor()") 
TypeError: coroutines cannot be used with run_in_executor() 

我知道,我可以使用,而不是async_requestsync_request funcion,在這種情況下我會通過將阻塞函數發送到另一個線程來進行協程。

我也知道我可以在事件循環中調用async_request十次。就像下面的代碼:

loop = asyncio.get_event_loop() 
futures = [async_request(loop) for i in range(10)] 
loop.run_until_complete(asyncio.wait(futures)) 

但在這種情況下,我會使用一個單一的線程。

我怎麼能使用這兩種方案,在多線程內工作的協程?正如你通過代碼所看到的那樣,我將(不使用)pool傳遞給async_request,希望我可以編寫一些告訴工作者創造未來的東西,將其發送給池並異步(釋放工作者)等待爲結果。

我想這樣做的原因是爲了使應用程序可擴展。這是不必要的一步嗎?我應該只是每個網址有一個線程,就是這樣嗎?例如:

LEN = len(list_of_urls) 
EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=LEN) 

夠好嗎?

+2

我真的不知道爲什麼你要做到這一點。實際上,你將爲每個協程運行一個獨特的循環,這會破壞使用事件循環的全部目的。你應該堅持asyncio,堅持線程,或者,如果你覺得由於某種原因這些都不夠用,請嘗試多處理。 – dirn

回答

2

你必須創建和設置線程上下文的新事件循環來運行協程:

import asyncio 
from concurrent.futures import ThreadPoolExecutor 


def run(corofn, *args): 
    loop = asyncio.new_event_loop() 
    try: 
     coro = corofn(*args) 
     asyncio.set_event_loop(loop) 
     return loop.run_until_complete(coro) 
    finally: 
     loop.close() 


async def main(): 
    loop = asyncio.get_event_loop() 
    executor = ThreadPoolExecutor(max_workers=5) 
    futures = [ 
     loop.run_in_executor(executor, run, asyncio.sleep, 1, x) 
     for x in range(10)] 
    print(await asyncio.gather(*futures)) 
    # Prints: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 


if __name__ == '__main__': 
    loop = asyncio.get_event_loop() 
    loop.run_until_complete(main()) 
+0

我不認爲這個回覆做我想要的。運行此操作仍需要2秒鐘。我期待所有的線程共享相同的事件池。也許我不清楚我的問題。 – zeh

+2

@zeh事件循環意味着線程特定,因爲'asyncio'是關於協作式多任務處理(與搶先式多任務相反,就像線程模型一樣)。循環負責在不同任務之間切換上下文,以便一次只運行其中一個任務。使用線程會挫敗這個目的。 – Vincent

+0

我認爲每個線程應該有一個循環,其中可以執行多個協程,如下所示: 'futures = [asyncio.run_coroutine_threadsafe(corofn(t,ix),loops [i])' 我是線程號碼。也許我們可以爲此得到一個工作示例。 這裏還有一個有趣的話題:https://stackoverflow.com/questions/32059732/send-asyncio-tasks-to-loop-running-in-other-thread –

相關問題