2014-12-05 61 views
27

我正在嘗試編寫一個應用程序,該應用程序與multiprocessing.Pool同時應用一個函數。我想這個函數是一個實例方法(所以我可以在不同的子類中定義它)。這似乎不可能;正如我在別處瞭解到的,顯然是bound methods can't be pickled。那麼爲什麼開始一個帶有綁定方法的multiprocessing.Process作爲目標工作呢?下面的代碼:爲什麼我可以將實例方法傳遞給multiprocessing.Process,而不是multiprocessing.Pool?

import multiprocessing 

def test1(): 
    print "Hello, world 1" 

def increment(x): 
    return x + 1 

class testClass(): 
    def process(self): 
     process1 = multiprocessing.Process(target=test1) 
     process1.start() 
     process1.join() 
     process2 = multiprocessing.Process(target=self.test2) 
     process2.start() 
     process2.join() 

    def pool(self): 
     pool = multiprocessing.Pool(1) 
     for answer in pool.imap(increment, range(10)): 
      print answer 
     print 
     for answer in pool.imap(self.square, range(10)): 
      print answer 

    def test2(self): 
     print "Hello, world 2" 

    def square(self, x): 
     return x * x 

def main(): 
    c = testClass() 
    c.process() 
    c.pool() 

if __name__ == "__main__": 
    main() 

產生這樣的輸出:

Hello, world 1 
Hello, world 2 
1 
2 
3 
4 
5 
6 
7 
8 
9 
10 

Exception in thread Thread-2: 
Traceback (most recent call last): 
    File "C:\Python27\Lib\threading.py", line 551, in __bootstrap_inner 
    self.run() 
    File "C:\Python27\Lib\threading.py", line 504, in run 
    self.__target(*self.__args, **self.__kwargs) 
    File "C:\Python27\Lib\multiprocessing\pool.py", line 319, in _handle_tasks 
    put(task) 
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed 

過程爲什麼可以處理綁定方法,但不池?

+0

這是因爲它們不能被'pickle'序列化。如果您需要使用python2.7,並且您需要使代碼按原樣工作......您應該使用'multiprocessing'的分支,它可以醃製實例方法並可以醃製一個'Pool'。看看'pathos.multiprocessing',你可以在上面的帖子中引用的stackoverflow鏈接中找到它。 – 2014-12-05 22:09:17

+0

更具體地說,該鏈接顯示2.x中的實例方法如何在「Pool」中簡單地序列化:http://stackoverflow.com/a/21345273/2379433 – 2015-04-26 18:10:01

+0

它是否必須是實例方法?你能夠使用classmethod嗎?我嘗試了它,併爲我工作得很好。 – 2017-08-27 20:20:30

回答

20

pickle模塊通常不能鹹菜實例方法:

>>> import pickle 
>>> class A(object): 
... def z(self): print "hi" 
... 
>>> a = A() 
>>> pickle.dumps(a.z) 
Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/usr/local/lib/python2.7/pickle.py", line 1374, in dumps 
    Pickler(file, protocol).dump(obj) 
    File "/usr/local/lib/python2.7/pickle.py", line 224, in dump 
    self.save(obj) 
    File "/usr/local/lib/python2.7/pickle.py", line 306, in save 
    rv = reduce(self.proto) 
    File "/usr/local/lib/python2.7/copy_reg.py", line 70, in _reduce_ex 
    raise TypeError, "can't pickle %s objects" % base.__name__ 
TypeError: can't pickle instancemethod objects 

然而,multiprocessing模塊has a custom Pickler that adds some code to enable this feature

# 
# Try making some callable types picklable 
# 

from pickle import Pickler 
class ForkingPickler(Pickler): 
    dispatch = Pickler.dispatch.copy() 

    @classmethod 
    def register(cls, type, reduce): 
     def dispatcher(self, obj): 
      rv = reduce(obj) 
      self.save_reduce(obj=obj, *rv) 
     cls.dispatch[type] = dispatcher 

def _reduce_method(m): 
    if m.im_self is None: 
     return getattr, (m.im_class, m.im_func.func_name) 
    else: 
     return getattr, (m.im_self, m.im_func.func_name) 
ForkingPickler.register(type(ForkingPickler.save), _reduce_method) 

您可以使用copy_reg模塊,看看它爲自己工作複製此:

>>> import copy_reg 
>>> def _reduce_method(m): 
...  if m.im_self is None: 
...   return getattr, (m.im_class, m.im_func.func_name) 
...  else: 
...   return getattr, (m.im_self, m.im_func.func_name) 
... 
>>> copy_reg.pickle(type(a.z), _reduce_method) 
>>> pickle.dumps(a.z) 
"c__builtin__\ngetattr\np0\n(ccopy_reg\n_reconstructor\np1\n(c__main__\nA\np2\nc__builtin__\nobject\np3\nNtp4\nRp5\nS'z'\np6\ntp7\nRp8\n." 

當您使用Process.start產卵在Windows上一個新的進程,it pickles all the parameters you passed to the child process using this custom ForkingPickler

# 
# Windows 
# 

else: 
    # snip... 
    from pickle import load, HIGHEST_PROTOCOL 

    def dump(obj, file, protocol=None): 
     ForkingPickler(file, protocol).dump(obj) 

    # 
    # We define a Popen class similar to the one from subprocess, but 
    # whose constructor takes a process object as its argument. 
    # 

    class Popen(object): 
     ''' 
     Start a subprocess to run the code of a process object 
     ''' 
     _tls = thread._local() 

     def __init__(self, process_obj): 
      # create pipe for communication with child 
      rfd, wfd = os.pipe() 

      # get handle for read end of the pipe and make it inheritable 
      ... 
      # start process 
      ... 

      # set attributes of self 
      ... 

      # send information to child 
      prep_data = get_preparation_data(process_obj._name) 
      to_child = os.fdopen(wfd, 'wb') 
      Popen._tls.process_handle = int(hp) 
      try: 
       dump(prep_data, to_child, HIGHEST_PROTOCOL) 
       dump(process_obj, to_child, HIGHEST_PROTOCOL) 
      finally: 
       del Popen._tls.process_handle 
       to_child.close() 

注意「發送信息給孩子」一節。它使用dump函數,它使用ForkingPickler來醃製數據,這意味着您的實例方法可能被醃製。

現在,當您使用multiprocessing.Pool上的方法向子進程發送方法時,它使用multiprocessing.Pipe來醃製數據。在Python 2.7中,multiprocessing.Pipe在C中實現,and calls pickle_dumps directly,所以它沒有利用ForkingPickler。這意味着酸洗實例方法不起作用。

但是,如果你使用copy_reg註冊instancemethod類型,而不是一個自定義Pickler所有企圖酸洗將受到影響。所以,你可以用它來使酸洗實例方法,甚至通過Pool

import multiprocessing 
import copy_reg 
import types 

def _reduce_method(m): 
    if m.im_self is None: 
     return getattr, (m.im_class, m.im_func.func_name) 
    else: 
     return getattr, (m.im_self, m.im_func.func_name) 
copy_reg.pickle(types.MethodType, _reduce_method) 

def test1(): 
    print("Hello, world 1") 

def increment(x): 
    return x + 1 

class testClass(): 
    def process(self): 
     process1 = multiprocessing.Process(target=test1) 
     process1.start() 
     process1.join() 
     process2 = multiprocessing.Process(target=self.test2) 
     process2.start() 
     process2.join() 

    def pool(self): 
     pool = multiprocessing.Pool(1) 
     for answer in pool.imap(increment, range(10)): 
      print(answer) 
     print 
     for answer in pool.imap(self.square, range(10)): 
      print(answer) 

    def test2(self): 
     print("Hello, world 2") 

    def square(self, x): 
     return x * x 

def main(): 
    c = testClass() 
    c.process() 
    c.pool() 

if __name__ == "__main__": 
    main() 

輸出:

Hello, world 1 
Hello, world 2 
GOT (0, 0, (True, 1)) 
GOT (0, 1, (True, 2)) 
GOT (0, 2, (True, 3)) 
GOT (0, 3, (True, 4)) 
GOT (0, 4, (True, 5)) 
1GOT (0, 5, (True, 6)) 

GOT (0, 6, (True, 7)) 
2 
GOT (0, 7, (True, 8)) 
3 
GOT (0, 8, (True, 9)) 
GOT (0, 9, (True, 10)) 
4 
5 
6 
7 
8 
9 
10 

GOT (1, 0, (True, 0)) 
0 
GOT (1, 1, (True, 1)) 
1 
GOT (1, 2, (True, 4)) 
4 
GOT (1, 3, (True, 9)) 
9 
GOT (1, 4, (True, 16)) 
16 
GOT (1, 5, (True, 25)) 
25 
GOT (1, 6, (True, 36)) 
36 
GOT (1, 7, (True, 49)) 
49 
GOT (1, 8, (True, 64)) 
64 
GOT (1, 9, (True, 81)) 
81 
GOT None 

還要注意,在Python 3.x中,pickle可以本地泡菜實例方法的類型,所以沒有這些東西更重要。 :)

+0

感謝您的建議;我很驚訝多處理模塊沒有實現這一點。您的確切解決方案對我來說不起作用,因爲它涉及對該方法綁定的實例進行酸洗,這會導致其他問題,但它指向了正確的方向。相反,我定義了在模塊的頂層進行多處理期間要運行的方法,以避免這兩個問題並獲得我想要的行爲。 – dpitch40 2014-12-09 20:50:15

7

下面是我有時使用的一種替代方法,它在Python2中起作用。X:

您可以創建各種以實例方法,接受一個對象,你要在池中運行,其實例方法,並將它要求你的實例方法的頂級「別名」:

import functools 
import multiprocessing 

def _instance_method_alias(obj, arg): 
    """ 
    Alias for instance method that allows the method to be called in a 
    multiprocessing pool 
    """ 
    obj.instance_method(arg) 
    return 

class MyClass(object): 
    """ 
    Our custom class whose instance methods we want to be able to use in a 
    multiprocessing pool 
    """ 

    def __init__(self): 
     self.my_string = "From MyClass: {}" 

    def instance_method(self, arg): 
     """ 
     Some arbitrary instance method 
     """ 

     print(self.my_string.format(arg)) 
     return 

# create an object of MyClass 
obj = MyClass() 

# use functools.partial to create a new method that always has the 
# MyClass object passed as its first argument 
_bound_instance_method_alias = functools.partial(_instance_method_alias, obj) 

# create our list of things we will use the pool to map 
l = [1,2,3] 

# create the pool of workers 
pool = multiprocessing.Pool() 

# call pool.map, passing it the newly created function 
pool.map(_bound_instance_method_alias, l) 

# cleanup 
pool.close() 
pool.join() 

這段代碼產生以下輸出:

從MyClass的:1
從MyClass的:2
從MyClass的:3

一個限制是您不能將此用於修改對象的方法。每個進程都會獲取其調用方法的對象的副本,因此更改不會傳播回主進程。如果你不需要從你調用的方法中修改對象,這可以是一個簡單的解決方案。

+1

感謝這篇文章,在深入酸洗等後對我有意義,這對我很有用。 Python 3將(最終)彌補這一差距。乾杯! – leomelzer 2015-09-30 09:43:15

4

以下是Python 2中更簡單的工作方式,只是包裝原始實例方法。在MacOSX和Linux上運行良好,不適用於Windows,測試Python 2.7

from multiprocessing import Pool 

class Person(object): 
    def __init__(self): 
     self.name = 'Weizhong Tu' 

    def calc(self, x): 
     print self.name 
     return x ** 5 


def func(x, p=Person()): 
    return p.calc(x) 


pool = Pool() 
print pool.map(func, range(10)) 
相關問題