2016-08-12 104 views
2

我想了解,是否有可能運行asyncio.Server實例,而事件循環已經運行了run_forever方法(從一個單獨的線程, 當然)。 據我所知,服務器可以啓動loop.run_until_complete(asyncio.start_server(...))await asyncio.start_server(...),如果循環已經運行。 第一種方法對我來說是不可接受的,因爲循環已經由run_forever方法運行。但是我也不能使用await表達式,因爲我要從「循環區域」之外開始它(例如,從主要方法,它不能被標記爲異步,對不對?)是否有可能運行asyncio.Server實例,而事件循環已經在運行

def loop_thread(loop): 
    asyncio.set_event_loop(loop) 
    try: 
     loop.run_forever() 
    finally: 
     loop.close() 
     print("loop clesed") 

class SchedulerTestManager: 
    def __init__(self): 
     ... 

     self.loop = asyncio.get_event_loop() 
     self.servers_loop_thread = threading.Thread(
      target=loop_thread, args=(self.loop,)) 
     ... 

    def start_test(self): 
     self.servers_loop_thread.start() 
     return self.servers_loop_thread 

    def add_router(self, router): 
     r = self.endpoint.add_router(router) 
     host = router.ConnectionParameters.Host 
     port = router.ConnectionParameters.Port 
     srv = TcpServer(host, port) 
     server_coro = asyncio.start_server(
      self.handle_connection, self.host, self.port) 
     # does not work since add_router is not async 
     # self.server = await server_coro 
     # does not work, since the loop is already running 
     # self.server = self.loop.run_until_complete(server_coro) 
     return r 


def maind(): 
    st_manager = SchedulerTestManager() 
    thread = st_manager.start_test() 
    router = st_manager.add_router(router) 

最簡單的解決方案是在開始測試(運行循環)之前添加所有路由器(服務器)。但我想嘗試實現它,所以當測試已經運行時可以添加路由器。我認爲loop.call_sooncall_soon_threadsafe)方法可以幫助我,但它似乎不能說明協程,而只是一個簡單的函數。

希望我的解釋不是很混亂。提前致謝!

回答

1

對於在一個線程中執行的事件循環與在其他線程中執行的常規舊線程代碼之間的通信,您可以使用janus庫。

這是一個有兩個接口的隊列:異步和線程安全同步。

這是用法示例:

import asyncio 
import janus 

loop = asyncio.get_event_loop() 
queue = janus.Queue(loop=loop) 

def threaded(sync_q): 
    for i in range(100): 
     sync_q.put(i) 
    sync_q.join() 

@asyncio.coroutine 
def async_coro(async_q): 
    for i in range(100): 
     val = yield from async_q.get() 
     assert val == i 
     async_q.task_done() 

fut = loop.run_in_executor(None, threaded, queue.sync_q) 
loop.run_until_complete(async_coro(queue.async_q)) 
loop.run_until_complete(fut) 

您可以創建一個循環從隊列中等待新的消息,並要求啓動新服務器的任務。其他線程可能會將新消息推入隊列以請求新的服務器。

+0

謝謝!似乎這就是我要找的。 –

相關問題