2009-11-29 77 views
168

我試圖使用multiprocessingPool.map()函數來同時分配工作。當我使用下面的代碼,它工作正常:使用多處理時不能pickle <type'instancemethod'> Pool.map()

import multiprocessing 

def f(x): 
    return x*x 

def go(): 
    pool = multiprocessing.Pool(processes=4)   
    print pool.map(f, range(10)) 


if __name__== '__main__' : 
    go() 

然而,當我在一個更面向對象的方法使用它,這是行不通的。它給人的錯誤信息是:

import someClass 

if __name__== '__main__' : 
    sc = someClass.someClass() 
    sc.go() 

和下面是我someClass類:

import multiprocessing 

class someClass(object): 
    def __init__(self): 
     pass 

    def f(self, x): 
     return x*x 

    def go(self): 
     pool = multiprocessing.Pool(processes=4)  
     print pool.map(self.f, range(10)) 

任何人都知道的

PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup 
__builtin__.instancemethod failed 

這時候下面是我的主要程序中出現問題可能是或簡單的解決方法?

+2

如果f是一個嵌套函數有一個類似的錯誤'PicklingError:不能鹹菜的:屬性查找建宏。功能失敗' – ggg

回答

95

問題是多處理必須醃製一些東西來將它們吊在進程中,並且綁定方法不可用。解決方法(無論您認爲「簡單」還是不行;-)是將基礎架構添加到您的程序中,以允許將這些方法醃漬,並將其註冊爲標準庫方法。

例如,Steven Bethard對this thread的貢獻顯示了一種完全可行的方法,允許通過copy_reg進行酸洗/去除方法。

+0

太好了 - 謝謝。似乎以某種方式取得了進展,無論如何:使用http://pastebin.ca/1693348上的代碼,我現在得到一個RuntimeError:超出最大遞歸深度。我環顧四周,一個論壇帖子建議將最大深度增加到1500(默認1000),但我沒有那裏的喜悅。說實話,我看不出什麼部分(至少是我的代碼)可能會失去控制,除非由於某些原因代碼在循環中進行酸洗和取消操作,這是由於我爲使史蒂文的代碼OO? – ventolin

+1

您的'_pickle_method'返回'self._unpickle_method',一個綁定的方法;所以當然pickle現在試着去醃製這個東西 - 它的確如你所說的那樣:遞歸地調用'_pickle_method'。即通過用這種方式編寫代碼,你不可避免地引入了無限遞歸。我建議回到Steven的代碼中(當不恰當的時候,不要在OO的祭壇上敬拜:Python中的許多事情最好以更實用的方式完成,而且這是一個)。 –

+7

[for the lazy](http://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods#edit2155350) – John

28

您也可以在someClass()中定義一個__call__()方法,該方法調用someClass.go(),然後將someClass()的實例傳遞給池。這個對象是與pickle,它工作正常(我)......

+2

這比Alex Martelli提出的技術容易得多,但是您僅限於每個類僅向您的多處理池發送一個方法。 – deprecated

+6

要牢記的另一個細節是,它只是*被醃製的對象(類實例),而不是類本身。因此,如果您已將任何類屬性從其默認值更改,則這些更改不會傳播到不同的進程。解決方法是確保您的函數需要的所有內容都存儲爲實例屬性。 – deprecated

+2

@dorvak可以用'__call __()'來表示一個簡單的例子嗎?我認爲你的答案可能是更清晰的答案 - 我努力理解這個錯誤,並且第一次來看看電話。順便說一下,這個答案也有助於澄清什麼多處理功能:[http://stackoverflow.com/a/20789937/305883] – user305883

17

一些限制雖然對史蒂芬Bethard的解決方案:

當您註冊類方法,如函數,類的析構函數被稱爲出奇每你的方法處理完成的時間。所以如果你有一個類的實例調用n次方法,那麼成員可能在2次運行之間消失,你可能會收到一條消息malloc: *** error for object 0x...: pointer being freed was not allocated(例如打開的成員文件)或pure virtual method called, terminate called without an active exception(這意味着比我使用的成員對象的生命期比我想象的要短)。我在處理大於池大小的n時得到了這個結果。下面是一個簡單的例子:

from multiprocessing import Pool, cpu_count 
from multiprocessing.pool import ApplyResult 

# --------- see Stenven's solution above ------------- 
from copy_reg import pickle 
from types import MethodType 

def _pickle_method(method): 
    func_name = method.im_func.__name__ 
    obj = method.im_self 
    cls = method.im_class 
    return _unpickle_method, (func_name, obj, cls) 

def _unpickle_method(func_name, obj, cls): 
    for cls in cls.mro(): 
     try: 
      func = cls.__dict__[func_name] 
     except KeyError: 
      pass 
     else: 
      break 
    return func.__get__(obj, cls) 


class Myclass(object): 

    def __init__(self, nobj, workers=cpu_count()): 

     print "Constructor ..." 
     # multi-processing 
     pool = Pool(processes=workers) 
     async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ] 
     pool.close() 
     # waiting for all results 
     map(ApplyResult.wait, async_results) 
     lst_results=[r.get() for r in async_results] 
     print lst_results 

    def __del__(self): 
     print "... Destructor" 

    def process_obj(self, index): 
     print "object %d" % index 
     return "results" 

pickle(MethodType, _pickle_method, _unpickle_method) 
Myclass(nobj=8, workers=3) 
# problem !!! the destructor is called nobj times (instead of once) 

輸出:

Constructor ... 
object 0 
object 1 
object 2 
... Destructor 
object 3 
... Destructor 
object 4 
... Destructor 
object 5 
... Destructor 
object 6 
... Destructor 
object 7 
... Destructor 
... Destructor 
... Destructor 
['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results'] 
... Destructor 

__call__方法不是那麼等效,因爲[無,...]被從結果如下:

from multiprocessing import Pool, cpu_count 
from multiprocessing.pool import ApplyResult 

class Myclass(object): 

    def __init__(self, nobj, workers=cpu_count()): 

     print "Constructor ..." 
     # multiprocessing 
     pool = Pool(processes=workers) 
     async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ] 
     pool.close() 
     # waiting for all results 
     map(ApplyResult.wait, async_results) 
     lst_results=[r.get() for r in async_results] 
     print lst_results 

    def __call__(self, i): 
     self.process_obj(i) 

    def __del__(self): 
     print "... Destructor" 

    def process_obj(self, i): 
     print "obj %d" % i 
     return "result" 

Myclass(nobj=8, workers=3) 
# problem !!! the destructor is called nobj times (instead of once), 
# **and** results are empty ! 

所以這兩種方法都不能令人滿意...

+7

由於'__call__'的定義缺少'return',所以''返回'None',它應該是'return self.process_obj(i)'。 – torek

+0

@Eric我得到了同樣的錯誤,我試過這個解決方案,但是我開始得到新的錯誤,因爲「cPickle.PicklingError:Can not pickle :attribute lookup __builtin __。function failed」。你知道背後有什麼可能的原因嗎? – Naman

8

還有另一個捷徑可以使用,雖然它可能是無效的這取決於你的類實例中的內容。

正如大家所說的問題是multiprocessing代碼必須醃製它發送到它已經開始的子進程的東西,pickler不會執行實例方法。

但是,您可以不用發送實例方法,而是將實際的類實例以及要調用的函數的名稱發送到一個普通函數,然後使用getattr來調用實例方法,從而創建邊界方法在Pool子流程中。這與定義__call__方法類似,只不過您可以調用多個成員函數。

從他的回答中竊取@ EricH的代碼並對其進行了註釋(我重新輸入了它,因此出於某種原因,這似乎比剪切粘貼更容易:-))所有的魔法:

import multiprocessing 
import os 

def call_it(instance, name, args=(), kwargs=None): 
    "indirect caller for instance methods and multiprocessing" 
    if kwargs is None: 
     kwargs = {} 
    return getattr(instance, name)(*args, **kwargs) 

class Klass(object): 
    def __init__(self, nobj, workers=multiprocessing.cpu_count()): 
     print "Constructor (in pid=%d)..." % os.getpid() 
     self.count = 1 
     pool = multiprocessing.Pool(processes = workers) 
     async_results = [pool.apply_async(call_it, 
      args = (self, 'process_obj', (i,))) for i in range(nobj)] 
     pool.close() 
     map(multiprocessing.pool.ApplyResult.wait, async_results) 
     lst_results = [r.get() for r in async_results] 
     print lst_results 

    def __del__(self): 
     self.count -= 1 
     print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count) 

    def process_obj(self, index): 
     print "object %d" % index 
     return "results" 

Klass(nobj=8, workers=3) 

輸出顯示,確實,構造函數被調用一次(在原始PID)和析構函數被調用9次(一次爲每個拷貝每pool-製成= 2或3倍工人進程根據需要加上一次在原始進程中)。這往往是OK,因爲在這種情況下,由於默認Pickler會使得整個實例的副本,(半)祕密重新填充它,在這種情況下,這樣做的:

obj = object.__new__(Klass) 
obj.__dict__.update({'count':1}) 

- 這就是爲什麼即使析構函數在三個工作進程中被調用八次,每次都從1到0遞減 - 但是當然你仍然可以通過這種方式陷入困境。如有必要,您可以提供您自己的__setstate__

def __setstate__(self, adict): 
     self.count = adict['count'] 

例如在這種情況下。

52

這些解決方案的全部是醜陋的,因爲多,酸洗被打破,除非有限,你跳的標準庫外。

如果使用名爲pathos.multiprocesssingmultiprocessing的分支,可以在多處理的map函數中直接使用類和類方法。這是因爲dill使用的picklecPickle,並dill而是可以序列化Python中幾乎所有的東西。

pathos.multiprocessing還提供了異步映射功能......,它可以map功能與多個參數(如map(math.pow, [1,2,3], [4,5,6])

參見: What can multiprocessing and dill do together?

和: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/

>>> import pathos.pools as pp 
>>> p = pp.ProcessPool(4) 
>>> 
>>> def add(x,y): 
... return x+y 
... 
>>> x = [0,1,2,3] 
>>> y = [4,5,6,7] 
>>> 
>>> p.map(add, x, y) 
[4, 6, 8, 10] 
>>> 
>>> class Test(object): 
... def plus(self, x, y): 
...  return x+y 
... 
>>> t = Test() 
>>> 
>>> p.map(Test.plus, [t]*4, x, y) 
[4, 6, 8, 10] 
>>> 
>>> p.map(t.plus, x, y) 
[4, 6, 8, 10] 

而只是爲了明確地說,你可以做到完全想要你想要做的第一個地方,你可以做到這一點如果你願意的話,你可以去翻譯。

>>> import pathos.pools as pp 
>>> class someClass(object): 
... def __init__(self): 
...  pass 
... def f(self, x): 
...  return x*x 
... def go(self): 
...  pool = pp.ProcessPool(4) 
...  print pool.map(self.f, range(10)) 
... 
>>> sc = someClass() 
>>> sc.go() 
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] 
>>> 

獲取這裏的代碼: https://github.com/uqfoundation/pathos

+2

請問您可以根據pathos.pp更新此答案,因爲pathos.multiprocessing不存在了嗎? –

+0

好吧,看起來像pathos.pp也不存在!雖然它在網站上的例子中提到過。我正在使用pathos-0.1a1 –

+8

我是'pathos'作者。你提到的版本已經有幾年了。試試github上的版本,你可以使用'pathos.pp'或https://github.com/uqfoundation/ppft。 –

4

您也可以將您的someClass(),這就要求someClass.go()再通someClass()實例池裏面定義__call__()方法。這個對象是與pickle,它工作正常(我)......

class someClass(object): 
    def __init__(self): 
     pass 
    def f(self, x): 
     return x*x 

    def go(self): 
     p = Pool(4) 
     sc = p.map(self, range(4)) 
     print sc 

    def __call__(self, x): 
    return self.f(x) 

sc = someClass() 
sc.go() 
相關問題