3

作爲Python新的asyncio模塊的實驗,我創建了以下片段來處理後臺工作程序中的一組長時間運行的動作(作業)。限制在後臺線程中使用信號量同步運行異步協程

爲了控制同時運行的作業的數量,我在中引入了一個信號量(第56行)。然而,隨着信號量的到位,似乎所獲得的鎖定從未被釋放,因爲完成後(執行回調),等待的作業不會開始。當我溝通與塊,一切都按預期工作。

import asyncio 

from queue import Queue, Empty 
from datetime import datetime 
from threading import Thread 


class BackgroundWorker(Thread): 
    def __init__(self): 
     super().__init__() 
     self._keep_running = True 
     self._waiting_coros = Queue() 
     self._tasks = [] 
     self._loop = None # Loop must be initialized in child thread. 
     self.limit_simultaneous_processes = asyncio.Semaphore(2) 

    def stop(self): 
     self._keep_running = False 

    def run(self): 
     self._loop = asyncio.new_event_loop()  # Implicit creation of the loop only happens in the main thread. 
     asyncio.set_event_loop(self._loop)   # Since this is a child thread, we need to do in manually. 
     self._loop.run_until_complete(self.process_coros()) 

    def submit_coro(self, coro, callback=None): 
     self._waiting_coros.put((coro, callback)) 

    @asyncio.coroutine 
    def process_coros(self): 
     while self._keep_running: 
      try: 
       while True: 
        coro, callback = self._waiting_coros.get_nowait() 
        task = asyncio.async(coro()) 
        if callback: 
         task.add_done_callback(callback) 
        self._tasks.append(task) 
      except Empty as e: 
       pass 
      yield from asyncio.sleep(3)  # sleep so the other tasks can run 


background_worker = BackgroundWorker() 


class Job(object): 
    def __init__(self, idx): 
     super().__init__() 
     self._idx = idx 

    def process(self): 
     background_worker.submit_coro(self._process, self._process_callback) 

    @asyncio.coroutine 
    def _process(self): 
     with (yield from background_worker.limit_simultaneous_processes): 
      print("received processing slot %d" % self._idx) 
      start = datetime.now() 
      yield from asyncio.sleep(2) 
      print("processing %d took %s" % (self._idx, str(datetime.now() - start))) 

    def _process_callback(self, future): 
     print("callback %d triggered" % self._idx) 


def main(): 
    print("starting worker...") 
    background_worker.start() 

    for idx in range(10): 
     download_job = Job(idx) 
     download_job.process() 

    command = None 
    while command != "quit": 
     command = input("enter 'quit' to stop the program: \n") 

    print("stopping...") 
    background_worker.stop() 
    background_worker.join() 


if __name__ == '__main__': 
    main() 

任何人都可以幫我解釋一下這種行爲嗎?當與塊被清除時,爲什麼不是信號量增加?

+0

我不明白您的問題:信號計數減少,預期在你的代碼示例增加。 – 2015-02-11 16:36:12

+0

您是否運行過代碼?如果我這樣做,前兩個作業的回調被執行,但循環中的其他協程不會開始執行。我認爲他們仍然在等待從信號量獲得鎖定,但我可能在這裏是錯誤的。也許他們在等別的東西? – ImapUkua 2015-02-11 18:41:55

回答

4

我發現了這個bug:信號量是用主線程中的隱式事件回調來初始化的,而不是在線程以run()開始時顯式設置的信號量。

修正版本:

import asyncio 

from queue import Queue, Empty 
from datetime import datetime 
from threading import Thread 


class BackgroundWorker(Thread): 
    def __init__(self): 
     super().__init__() 
     self._keep_running = True 
     self._waiting_coros = Queue() 
     self._tasks = [] 
     self._loop = None       # Loop must be initialized in child thread. 
     self.limit_simultaneous_processes = None # Semaphore must be initialized after the loop is set. 

    def stop(self): 
     self._keep_running = False 

    def run(self): 
     self._loop = asyncio.new_event_loop()  # Implicit creation of the loop only happens in the main thread. 
     asyncio.set_event_loop(self._loop)   # Since this is a child thread, we need to do in manually. 
     self.limit_simultaneous_processes = asyncio.Semaphore(2) 
     self._loop.run_until_complete(self.process_coros()) 

    def submit_coro(self, coro, callback=None): 
     self._waiting_coros.put((coro, callback)) 

    @asyncio.coroutine 
    def process_coros(self): 
     while self._keep_running: 
      try: 
       while True: 
        coro, callback = self._waiting_coros.get_nowait() 
        task = asyncio.async(coro()) 
        if callback: 
         task.add_done_callback(callback) 
        self._tasks.append(task) 
      except Empty as e: 
       pass 
      yield from asyncio.sleep(3)  # sleep so the other tasks can run 


background_worker = BackgroundWorker() 


class Job(object): 
    def __init__(self, idx): 
     super().__init__() 
     self._idx = idx 

    def process(self): 
     background_worker.submit_coro(self._process, self._process_callback) 

    @asyncio.coroutine 
    def _process(self): 
     with (yield from background_worker.limit_simultaneous_processes): 
      print("received processing slot %d" % self._idx) 
      start = datetime.now() 
      yield from asyncio.sleep(2) 
      print("processing %d took %s" % (self._idx, str(datetime.now() - start))) 

    def _process_callback(self, future): 
     print("callback %d triggered" % self._idx) 


def main(): 
    print("starting worker...") 
    background_worker.start() 

    for idx in range(10): 
     download_job = Job(idx) 
     download_job.process() 

    command = None 
    while command != "quit": 
     command = input("enter 'quit' to stop the program: \n") 

    print("stopping...") 
    background_worker.stop() 
    background_worker.join() 


if __name__ == '__main__': 
    main()