3
作爲Python新的asyncio模塊的實驗,我創建了以下片段來處理後臺工作程序中的一組長時間運行的動作(作業)。限制在後臺線程中使用信號量同步運行異步協程
爲了控制同時運行的作業的數量,我在中引入了一個信號量(第56行)。然而,隨着信號量的到位,似乎所獲得的鎖定從未被釋放,因爲完成後(執行回調),等待的作業不會開始。當我溝通與塊,一切都按預期工作。
import asyncio
from queue import Queue, Empty
from datetime import datetime
from threading import Thread
class BackgroundWorker(Thread):
def __init__(self):
super().__init__()
self._keep_running = True
self._waiting_coros = Queue()
self._tasks = []
self._loop = None # Loop must be initialized in child thread.
self.limit_simultaneous_processes = asyncio.Semaphore(2)
def stop(self):
self._keep_running = False
def run(self):
self._loop = asyncio.new_event_loop() # Implicit creation of the loop only happens in the main thread.
asyncio.set_event_loop(self._loop) # Since this is a child thread, we need to do in manually.
self._loop.run_until_complete(self.process_coros())
def submit_coro(self, coro, callback=None):
self._waiting_coros.put((coro, callback))
@asyncio.coroutine
def process_coros(self):
while self._keep_running:
try:
while True:
coro, callback = self._waiting_coros.get_nowait()
task = asyncio.async(coro())
if callback:
task.add_done_callback(callback)
self._tasks.append(task)
except Empty as e:
pass
yield from asyncio.sleep(3) # sleep so the other tasks can run
background_worker = BackgroundWorker()
class Job(object):
def __init__(self, idx):
super().__init__()
self._idx = idx
def process(self):
background_worker.submit_coro(self._process, self._process_callback)
@asyncio.coroutine
def _process(self):
with (yield from background_worker.limit_simultaneous_processes):
print("received processing slot %d" % self._idx)
start = datetime.now()
yield from asyncio.sleep(2)
print("processing %d took %s" % (self._idx, str(datetime.now() - start)))
def _process_callback(self, future):
print("callback %d triggered" % self._idx)
def main():
print("starting worker...")
background_worker.start()
for idx in range(10):
download_job = Job(idx)
download_job.process()
command = None
while command != "quit":
command = input("enter 'quit' to stop the program: \n")
print("stopping...")
background_worker.stop()
background_worker.join()
if __name__ == '__main__':
main()
任何人都可以幫我解釋一下這種行爲嗎?當與塊被清除時,爲什麼不是信號量增加?
我不明白您的問題:信號計數減少,預期在你的代碼示例增加。 – 2015-02-11 16:36:12
您是否運行過代碼?如果我這樣做,前兩個作業的回調被執行,但循環中的其他協程不會開始執行。我認爲他們仍然在等待從信號量獲得鎖定,但我可能在這裏是錯誤的。也許他們在等別的東西? – ImapUkua 2015-02-11 18:41:55