4

我經常使用python對象和方法阻塞,直到完成,並且希望將這些方法轉換爲非阻塞版本。我發現自己執行下面的模式相當頻繁:什麼是使對象的非阻塞版本的Pythonic方式?

  1. 定義對象
  2. 定義其創建的對象實例的功能,並解析命令來調用對象的方法
  3. 定義一個「父」該對象創建運行第2步中定義的函數的子進程,並且複製原始對象的方法。

這樣做可以完成工作,但涉及很多繁瑣的代碼重複,而且對我來說看起來並不是很枯燥。 有沒有一個標準的,更好的方法來做到這一點?

高度簡化的例子來說明圖案我已經使用:

import ctypes 
import Queue 
import multiprocessing as mp 

class Hardware: 
    def __init__(
     self, 
     other_init_args): 
     self.dll = ctypes.cll.LoadLibrary('hardware.dll') 
     self.dll.Initialize(other_init_args) 

    def blocking_command(self, arg_1, arg_2, arg_3): 
     """ 
     This command takes a long time to execute, and blocks while it 
     executes. However, while it's executing, we have to coordinate 
     other pieces of hardware too, so blocking is bad. 
     """ 
     self.dll.Takes_A_Long_Time(arg_1, arg_2, arg_3) 

    def change_settings(self, arg_1, arg_2): 
     """ 
     Realistically, there's tons of other functions in the DLL we 
     want to expose as methods. For this example, just one. 
     """ 
     self.dll.Change_Settings(arg_1, arg_2) 

    def close(self): 
     self.dll.Quit() 

def hardware_child_process(
    commands, 
    other_init_args): 
    hw = Hardware(other_init_args) 
    while True: 
     cmd, args = commands.recv() 
     if cmd == 'start': 
      hw.blocking_command(**args) 
     elif cmd == 'change_settings': 
      hw.change_settings(**args) 
     elif cmd == 'quit': 
      break 
    hw.close() 

class Nonblocking_Hardware: 
    """ 
    This class (hopefully) duplicates the functionality of the 
    Hardware class, except now Hardware.blocking_command() doesn't 
    block other execution. 
    """ 
    def __init__(
     self, 
     other_init_args): 
     self.commands, self.child_commands = mp.Pipe() 
     self.child = mp.Process(
      target=hardware_child_process, 
      args=(self.child_commands, 
        other_init_args)) 
     self.child.start() 

    def blocking_command(self, arg_1, arg_2, arg_3): 
     """ 
     Doesn't block any more! 
     """ 
     self.commands.send(
      ('start', 
      {'arg_1': arg_1, 
       'arg_2': arg_2, 
       'arg_3': arg_3})) 

    def change_settings(self, arg_1, arg_2): 
     self.commands.send(
      ('change_settings', 
      {'arg_1': arg_1, 
       'arg_2': arg_2})) 

    def close(self): 
     self.commands.send(('quit', {})) 
     self.child.join() 
     return None 

背景故事:

我使用Python來控制硬件,通常是通過閉源的DLL我請使用ctypes。通常,我最終希望從DLL中調用哪些塊直到執行完成,但我不希望我的控制代碼被阻塞。例如,我可能會使用模擬輸出卡將照相機與照明同步。在模擬輸出卡可以向攝像機發送觸發脈衝之前,必須調用攝像機DLL「捕捉」功能,但是「捕捉」命令會阻止我啓動模擬輸出卡。

+1

'ctypes'釋放GIL這樣的標準方法,使非阻塞會主題 - 扭曲'deferToThread()',''asyncio' run_in_executor()'或其他一些方式來運行的功能一個線程池('multiprocessing.pool.ThreadPool','concurrent.futures.ThreadPoolExecutor')。 – jfs 2014-09-25 01:54:44

回答

2

我已經通過使用元類來創建非阻塞版本的阻塞函數的對象。它允許你只是這樣做是爲了創建一個類的非阻塞版本:

class NB_Hardware(object): 
    __metaclass__ = NonBlockBuilder 
    delegate = Hardware 
    nb_funcs = ['blocking_command'] 

我已把原來的實現,從而有針對性的Python 3和使用concurrent.futures.ThreadPoolExecutor(我是包裝阻塞I/O調用在asyncio上下文*)中使它們成爲非阻塞,並將它們改編爲使用Python 2和concurrent.futures.ProcessPoolExecutor。這裏是元類的實現及其輔助類:

from multiprocessing import cpu_count 
from concurrent.futures import ProcessPoolExecutor 

def runner(self, cb, *args, **kwargs): 
    return getattr(self, cb)(*args, **kwargs) 

class _ExecutorMixin(): 
    """ A Mixin that provides asynchronous functionality. 

    This mixin provides methods that allow a class to run 
    blocking methods in a ProcessPoolExecutor. 
    It also provides methods that attempt to keep the object 
    picklable despite having a non-picklable ProcessPoolExecutor 
    as part of its state. 

    """ 
    pool_workers = cpu_count() 

    def run_in_executor(self, callback, *args, **kwargs): 
     """ Runs a function in an Executor. 

     Returns a concurrent.Futures.Future 

     """ 
     if not hasattr(self, '_executor'): 
      self._executor = self._get_executor() 

     return self._executor.submit(runner, self, callback, *args, **kwargs) 

    def _get_executor(self): 
     return ProcessPoolExecutor(max_workers=self.pool_workers) 

    def __getattr__(self, attr): 
     if (self._obj and hasattr(self._obj, attr) and 
      not attr.startswith("__")): 
      return getattr(self._obj, attr) 
     raise AttributeError(attr) 

    def __getstate__(self): 
     self_dict = self.__dict__ 
     self_dict['_executor'] = None 
     return self_dict 

    def __setstate__(self, state): 
     self.__dict__.update(state) 
     self._executor = self._get_executor() 

class NonBlockBuilder(type): 
    """ Metaclass for adding non-blocking versions of methods to a class. 

    Expects to find the following class attributes: 
    nb_funcs - A list containing methods that need non-blocking wrappers 
    delegate - The class to wrap (add non-blocking methods to) 
    pool_workers - (optional) how many workers to put in the internal pool. 

    The metaclass inserts a mixin (_ExecutorMixin) into the inheritence 
    hierarchy of cls. This mixin provides methods that allow 
    the non-blocking wrappers to do their work. 

    """ 
    def __new__(cls, clsname, bases, dct, **kwargs): 
     nbfunc_list = dct.get('nb_funcs', []) 
     existing_nbfuncs = set() 

     def find_existing_nbfuncs(d): 
      for attr in d: 
       if attr.startswith("nb_"): 
        existing_nbfuncs.add(attr) 

     # Determine if any bases include the nb_funcs attribute, or 
     # if either this class or a base class provides an actual 
     # implementation for a non-blocking method. 
     find_existing_nbfuncs(dct) 
     for b in bases: 
      b_dct = b.__dict__ 
      nbfunc_list.extend(b_dct.get('nb_funcs', [])) 
      find_existing_nbfuncs(b_dct) 

     # Add _ExecutorMixin to bases. 
     if _ExecutorMixin not in bases: 
      bases += (_ExecutorMixin,) 

     # Add non-blocking funcs to dct, but only if a definition 
     # is not already provided by dct or one of our bases. 
     for func in nbfunc_list: 
      nb_name = 'nb_{}'.format(func) 
      if nb_name not in existing_nbfuncs: 
       dct[nb_name] = cls.nbfunc_maker(func) 

     return super(NonBlockBuilder, cls).__new__(cls, clsname, bases, dct) 

    def __init__(cls, name, bases, dct): 
     """ Properly initialize a non-blocking wrapper. 

     Sets pool_workers and delegate on the class, and also 
     adds an __init__ method to it that instantiates the 
     delegate with the proper context. 

     """ 
     super(NonBlockBuilder, cls).__init__(name, bases, dct) 
     pool_workers = dct.get('pool_workers') 
     delegate = dct.get('delegate') 
     old_init = dct.get('__init__') 
     # Search bases for values we care about, if we didn't 
     # find them on the child class. 
     for b in bases: 
      if b is object: # Skip object 
       continue 
      b_dct = b.__dict__ 
      if not pool_workers: 
       pool_workers = b_dct.get('pool_workers') 
      if not delegate: 
       delegate = b_dct.get('delegate') 
      if not old_init: 
       old_init = b_dct.get('__init__') 

     cls.delegate = delegate 

     # If we found a value for pool_workers, set it. If not, 
     # ExecutorMixin sets a default that will be used. 
     if pool_workers: 
      cls.pool_workers = pool_workers 

     # Here's the __init__ we want every wrapper class to use. 
     # It just instantiates the delegate object. 
     def init_func(self, *args, **kwargs): 
      # Be sure to call the original __init__, if there 
      # was one. 
      if old_init: 
       old_init(self, *args, **kwargs) 

      if self.delegate: 
       self._obj = self.delegate(*args, **kwargs) 
     cls.__init__ = init_func 

    @staticmethod 
    def nbfunc_maker(func): 
     def nb_func(self, *args, **kwargs): 
      return self.run_in_executor(func, *args, **kwargs) 
     return nb_func 

用法:

from nb_helper import NonBlockBuilder 
import time 


class Hardware: 
    def __init__(self, other_init_args): 
     self.other = other_init_args 

    def blocking_command(self, arg_1, arg_2, arg_3): 
     print("start blocking") 
     time.sleep(5) 
     return "blocking" 

    def normal_command(self): 
     return "normal" 


class NBHardware(object): 
    __metaclass__ = NonBlockBuilder 
    delegate = Hardware 
    nb_funcs = ['blocking_command'] 


if __name__ == "__main__": 
    h = NBHardware("abc") 
    print "doing blocking call" 
    print h.blocking_command(1,2,3) 
    print "done" 
    print "doing non-block call" 
    x = h.nb_blocking_command(1,2,3) # This is non-blocking and returns concurrent.future.Future 
    print h.normal_command() # You can still use the normal functions, too. 
    print x.result() # Waits for the result from the Future 

輸出:

doing blocking call 
start blocking 
< 5 second delay > 
blocking 
done 
doing non-block call 
start blocking 
normal 
< 5 second delay > 
blocking 

的一個棘手的一塊你是確保Hardware是picklable。你可以這樣做,通過__getstate__刪除dll對象,並在__setstate__中重新創建它,類似於_ExecutorMixin的做法。

您還需要Python 2.x backport of concurrent.futures

請注意,元類中存在一堆複雜性,因此它們可以繼承正常工作,並支持諸如提供__init__nb_*方法的自定義實現。例如,像這樣支持:

class AioBaseLock(object): 
    __metaclass__ = NonBlockBuilder 
    pool_workers = 1 
    coroutines = ['acquire', 'release'] 

def __init__(self, *args, **kwargs): 
    self._threaded_acquire = False 
    def _after_fork(obj): 
     obj._threaded_acquire = False 
    register_after_fork(self, _after_fork) 

def coro_acquire(self, *args, **kwargs): 
    def lock_acquired(fut): 
     if fut.result(): 
      self._threaded_acquire = True 

    out = self.run_in_executor(self._obj.acquire, *args, **kwargs) 
    out.add_done_callback(lock_acquired) 
    return out 

class AioLock(AioBaseLock): 
    delegate = Lock 


class AioRLock(AioBaseLock): 
    delegate = RLock 

如果您不需要這種靈活性,可以簡化實施頗有幾分:

class NonBlockBuilder(type): 
    """ Metaclass for adding non-blocking versions of methods to a class. 

    Expects to find the following class attributes: 
    nb_funcs - A list containing methods that need non-blocking wrappers 
    delegate - The class to wrap (add non-blocking methods to) 
    pool_workers - (optional) how many workers to put in the internal pool. 

    The metaclass inserts a mixin (_ExecutorMixin) into the inheritence 
    hierarchy of cls. This mixin provides methods that allow 
    the non-blocking wrappers to do their work. 

    """ 
    def __new__(cls, clsname, bases, dct, **kwargs): 
     nbfunc_list = dct.get('nb_funcs', []) 

     # Add _ExecutorMixin to bases. 
     if _ExecutorMixin not in bases: 
      bases += (_ExecutorMixin,) 

     # Add non-blocking funcs to dct, but only if a definition 
     # is not already provided by dct or one of our bases. 
     for func in nbfunc_list: 
      nb_name = 'nb_{}'.format(func) 
      dct[nb_name] = cls.nbfunc_maker(func) 

     return super(NonBlockBuilder, cls).__new__(cls, clsname, bases, dct) 

    def __init__(cls, name, bases, dct): 
     """ Properly initialize a non-blocking wrapper. 

     Sets pool_workers and delegate on the class, and also 
     adds an __init__ method to it that instantiates the 
     delegate with the proper context. 

     """ 
     super(NonBlockBuilder, cls).__init__(name, bases, dct) 
     pool_workers = dct.get('pool_workers') 
     cls.delegate = dct['delegate'] 

     # If we found a value for pool_workers, set it. If not, 
     # ExecutorMixin sets a default that will be used. 
     if pool_workers: 
      cls.pool_workers = pool_workers 

     # Here's the __init__ we want every wrapper class to use. 
     # It just instantiates the delegate object. 
     def init_func(self, *args, **kwargs): 
      self._obj = self.delegate(*args, **kwargs) 
     cls.__init__ = init_func 

    @staticmethod 
    def nbfunc_maker(func): 
     def nb_func(self, *args, **kwargs): 
      return self.run_in_executor(func, *args, **kwargs) 
     return nb_func 

*原代碼是here , 以供參考。

2

我用來異步啓動類方法的一種方法是創建一個池並使用apply_async調用幾個函數別名,而不是直接調用類方法。

說你有你的類的更簡單的版本:

class Hardware: 
    def __init__(self, stuff): 
     self.stuff = stuff 
     return 

    def blocking_command(self, arg1): 
     self.stuff.call_function(arg1) 
     return 

在你的模塊的頂層,定義一個新的功能,看起來像這樣:

def _blocking_command(Hardware_obj, arg1): 
    return Hardware_obj.blocking_command(Hardware_obj, arg1) 

由於類這個「別名」功能都定義在模塊的頂層,它們可以醃製,並且可以使用多處理庫來啓動它:

import multiprocessing 

hw_obj = Harware(stuff) 
pool = multiprocessing.Pool() 

results_obj = pool.apply_async(_blocking_command, (hw_obj, arg1)) 

函數調用的結果將在結果對象中可用。我喜歡這種方法,因爲它使用相對較少的代碼來使並行更容易。具體而言,它只添加了幾行代碼,而不是任何類,並且除了多處理之外,不需要額外的導入。

注:

  1. 不要使用這個對於需要修改的對象屬性的方法,但如果是後使用所有屬性已設置的類的,它工作正常,有效將類屬性視爲「只讀」。

  2. 你也可以在類方法中使用這種方法來啓動其他類方法,你只需要顯式地傳遞「self」。這可以讓你將浮動的「hardware_child_process」函數移入類中。它仍然可以充當一堆異步進程的調度器,但它會將該功能集中到您的Hardware類中。

相關問題