你的代碼失敗,因爲它無法pickle
實例方法(self.cal
),這是Python的試圖通過它們映射到multiprocessing.Pool
(當然,是有辦法做到這一點,當你產卵多個進程做,但它的方式太令人費解,而不是非常有用反正) - 因爲有它有「包」的數據並將其發送到生成的過程,拆包沒有共享內存的訪問。如果您試圖醃製a
實例,則會發生同樣的情況。
在multiprocessing
包唯一可用的共享內存訪問是一個鮮爲人知的multiprocessing.pool.ThreadPool
所以如果你真的想這樣做:
from multiprocessing.pool import ThreadPool
class A():
def __init__(self, vl):
self.vl = vl
def cal(self, nb):
return nb * self.vl
def run(self, dt):
t = ThreadPool(processes=4)
rs = t.map(self.cal, dt)
t.close()
return rs
a = A(2)
print(a.run(list(range(10))))
# prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
但是,這不會給你的並行化,因爲它本質上映射到你的正常的線程它們可以訪問共享內存。你應該通過類/靜態方法,而不是(如果你需要他們的稱呼)伴隨着這些數據,您希望他們與(在你的情況self.vl
)工作。如果您需要跨進程共享數據,則必須使用一些共享內存抽象,如multiprocessing.Value
,當然應用互斥量。
UPDATE
我說你可以做到這一點(也有或多或少都在做它的模塊,檢查pathos.multiprocessing
爲例),但我不認爲這是值得的麻煩 - 當你來到你必須欺騙你的系統做你想做的事情,有可能是你使用了錯誤的系統,或者你應該重新考慮你的設計。但是,對於信息性的緣故,這裏是做你想做的事,一個多設置一個辦法:
import sys
from multiprocessing import Pool
def parallel_call(params): # a helper for calling 'remote' instances
cls = getattr(sys.modules[__name__], params[0]) # get our class type
instance = cls.__new__(cls) # create a new instance without invoking __init__
instance.__dict__ = params[1] # apply the passed state to the new instance
method = getattr(instance, params[2]) # get the requested method
args = params[3] if isinstance(params[3], (list, tuple)) else [params[3]]
return method(*args) # expand arguments, call our method and return the result
class A(object):
def __init__(self, vl):
self.vl = vl
def cal(self, nb):
return nb * self.vl
def run(self, dt):
t = Pool(processes=4)
rs = t.map(parallel_call, self.prepare_call("cal", dt))
t.close()
return rs
def prepare_call(self, name, args): # creates a 'remote call' package for each argument
for arg in args:
yield [self.__class__.__name__, self.__dict__, name, arg]
if __name__ == "__main__": # important protection for cross-platform use
a = A(2)
print(a.run(list(range(10))))
# prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
我認爲這是非常自我解釋它是如何工作的,但總之它通過你的類的名稱,其當前狀態(沒有信號,壽),被稱爲一個期望的方法和參數與一個parallel_call
函數被稱爲在Pool
每個過程調用它。蟒自動泡菜和unpickles所有這些數據,因此所有parallel_call
需要做的是重建原始對象,找到在它的期望的方法和使用所提供的參數(或多個)調用它。
這樣,我們只傳遞數據而不嘗試傳遞活動對象,所以Python不會抱怨(在這種情況下,嘗試將實例方法的引用添加到類參數並查看會發生什麼情況)以及一切正常。
如果你想要沉迷於'魔法',你可以使它看起來完全像你的代碼(創建你自己的Pool
處理程序,從函數中取出名稱並將名稱發送到實際進程等),但是這個應該爲您的示例提供足夠的功能。但是,在您提出自己的希望之前,請記住,只有在共享「靜態」實例(一旦您在多處理環境中調用該實例後,它不會更改其初始狀態的實例)時,它纔會起作用。如果A.cal
方法要更改vl
屬性的內部狀態 - 它只會影響其更改的實例(除非主要實例在調用之間調用Pool
時發生更改)。如果你想分享狀態,你可以升級parallel_call
在通話結束後拿起instance.__dict__
,並將其與方法調用結果一起返回,然後在主叫方,你必須更新本地__dict__
,返回的數據爲改變原來的狀態。這還不夠 - 你實際上必須創建一個共享字典並處理所有的互斥體工作人員讓它可以被所有進程同時訪問(你可以使用multiprocessing.Manager
)。
所以,我剛纔說了,超過其價值的麻煩......
使用'如果__name__ == '__main __''後衛。 – user2357112
這似乎並不需要多處理。你在使用更大的數據還是什麼?或不同的方法?產生執行一個乘法的過程的開銷並不值得。 – acushner
一個類似的問題出現在'joblib'前面;使用numpy會更快,因爲新流程的複製和上下文切換工作量很大。 https://stackoverflow.com/questions/44084513/joblib-simple-example-parallel-example-slower-than-simple/44084595?noredirect=1#comment75345269_44084595 –