2015-10-01 61 views
7

假設我有以下代碼:有沒有辦法在多個線程中使用asyncio.Queue?

import asyncio 
import threading 

queue = asyncio.Queue() 

def threaded(): 
    import time 
    while True: 
     time.sleep(2) 
     queue.put_nowait(time.time()) 
     print(queue.qsize()) 

@asyncio.coroutine 
def async(): 
    while True: 
     time = yield from queue.get() 
     print(time) 

loop = asyncio.get_event_loop() 
asyncio.Task(async()) 
threading.Thread(target=threaded).start() 
loop.run_forever() 

這段代碼的問題是,裏面async協程循環永遠不會完成第一次迭代,而queue規模在不斷擴大。

這是爲什麼發生這種情況,我該如何解決這個問題?

我無法擺脫單獨的線程,因爲在我真正的代碼,我使用一個單獨的線程與串行設備進行通信,並且我還沒有找到一個方法來做到這一點使用asyncio

+1

'「我無法擺脫單獨的線程,因爲在我真正的代碼,我使用一個單獨的線程與串行設備進行通訊」' - 您是否嘗試過使用'loop.run_in_executor'做任何阻塞的相互作用串行設備? –

回答

14

asyncio.Queueis not thread-safe,所以你不能直接使用它從多個線程。相反,你可以使用janus,這是一個第三方庫,它提供了一個線程感知asyncio隊列:

import asyncio 
import threading 
import janus 

def threaded(squeue): 
    import time 
    while True: 
     time.sleep(2) 
     squeue.put_nowait(time.time()) 
     print(squeue.qsize()) 

@asyncio.coroutine 
def async(aqueue): 
    while True: 
     time = yield from aqueue.get() 
     print(time) 

loop = asyncio.get_event_loop() 
queue = janus.Queue(loop=loop) 
asyncio.Task(asyncio.ensure_future(queue.async_q)) 
threading.Thread(target=threaded, args=(queue.sync_q,)).start() 
loop.run_forever() 

還有aioprocessing(全披露:我寫的),它提供了處理安全(並作爲一個副作用,線程安全)隊列,但如果你不試圖使用multiprocessing,這是過度的。

+0

'NameError:名字 '異步' 不defined'在'asyncio.Task(異步(queue.async_q))'給出。我該怎麼辦? –

+0

@StamKaly對不起,使用'asyncio.async',甚至更好,'asyncio.ensure_future',因爲'asyncio.async'現在已經過時。 – dano

2

BaseEventLoop.call_soon_threadsafe就在眼前。詳情請參閱asyncio doc

簡單地改變你的這樣的:

def threaded(): 
    import time 
    while True: 
     time.sleep(1) 
     loop.call_soon_threadsafe(queue.put_nowait, time.time()) 
     loop.call_soon_threadsafe(lambda: print(queue.qsize())) 

下面是一個示例輸出:

0 
1443857763.3355968 
0 
1443857764.3368602 
0 
1443857765.338082 
0 
1443857766.3392274 
0 
1443857767.3403943 
1

如果你不希望使用另一個庫,你可以安排從線程協同程序。更換queue.put_nowait與以下工作正常。

asyncio.run_coroutine_threadsafe(queue.put(time.time()), loop) 

變量loop代表主線程中的事件循環。

編輯:

爲什麼你async協同程序沒有做任何事情的原因是 事件循環從來沒有給它一個機會,這樣做。隊列對象是 而非線程安全,如果您仔細查看cpython代碼,則會發現 這意味着put_nowait會通過 喚醒隊列中的使用者使用事件循環的未來方法call_soon。如果 我們可以使它使用call_soon_threadsafe它應該工作。然而,call_sooncall_soon_threadsafe之間的主要 差異是 ,call_soon_threadsafe通過調用loop._write_to_self()喚醒事件循環。所以我們自己說:

import asyncio 
import threading 

queue = asyncio.Queue() 

def threaded(): 
    import time 
    while True: 
     time.sleep(2) 
     queue.put_nowait(time.time()) 
     queue._loop._write_to_self() 
     print(queue.qsize()) 

@asyncio.coroutine 
def async(): 
    while True: 
     time = yield from queue.get() 
     print(time) 

loop = asyncio.get_event_loop() 
asyncio.Task(async()) 
threading.Thread(target=threaded).start() 
loop.run_forever() 

然後,一切都按預期工作。

至於 訪問共享對象的線程方面,asyncio.queue具有線程appendpopleftcollections.deque下使用。 也許檢查隊列不空,popleft不是原子,但如果你 消耗隊列只在一個線程(事件循環的一個) 它可能被罰款。

其他提出的解決方案,從loop.call_soon_threadsafe化做 高的回答和我的asyncio.run_coroutine_threadsafe只是在做 此,醒來事件循環。

相關問題