2017-05-25 181 views
0

最初多處理,我有一個類存儲一些處理後的值和重新使用那些與它的其它方法。呼叫類方法的Python

問題是,當我試圖將類方法分成多個進程加速,python衍生的進程,但似乎沒有工作(正如我在任務管理器中看到,只有1個進程正在運行),結果是從來沒有交付。

我做了幾個搜索,發現pathos.multiprocessing可以做到這一點,而不是,但我不知道是否標準庫就可以解決這個問題呢?

from multiprocessing import Pool 

class A(): 
    def __init__(self, vl): 
     self.vl = vl 
    def cal(self, nb): 
     return nb * self.vl 
    def run(self, dt): 
     t = Pool(processes=4) 
     rs = t.map(self.cal, dt) 
     t.close() 
     return t 

a = A(2) 

a.run(list(range(10))) 
+2

使用'如果__name__ == '__main __''後衛。 – user2357112

+0

這似乎並不需要多處理。你在使用更大的數據還是什麼?或不同的方法?產生執行一個乘法的過程的開銷並不值得。 – acushner

+0

一個類似的問題出現在'joblib'前面;使用numpy會更快,因爲新流程的複製和上下文切換工作量很大。 https://stackoverflow.com/questions/44084513/joblib-simple-example-parallel-example-slower-than-simple/44084595?noredirect=1#comment75345269_44084595 –

回答

2

你的代碼失敗,因爲它無法pickle實例方法(self.cal),這是Python的試圖通過它們映射到multiprocessing.Pool(當然,是有辦法做到這一點,當你產卵多個進程做,但它的方式太令人費解,而不是非常有用反正) - 因爲有它有「包」的數據並將其發送到生成的過程,拆包沒有共享內存的訪問。如果您試圖醃製a實例,則會發生同樣的情況。

multiprocessing包唯一可用的共享內存訪問是一個鮮爲人知的multiprocessing.pool.ThreadPool所以如果你真的想這樣做:

from multiprocessing.pool import ThreadPool 

class A(): 
    def __init__(self, vl): 
     self.vl = vl 
    def cal(self, nb): 
     return nb * self.vl 
    def run(self, dt): 
     t = ThreadPool(processes=4) 
     rs = t.map(self.cal, dt) 
     t.close() 
     return rs 

a = A(2) 
print(a.run(list(range(10)))) 
# prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] 

但是,這不會給你的並行化,因爲它本質上映射到你的正常的線程它們可以訪問共享內存。你應該通過類/靜態方法,而不是(如果你需要他們的稱呼)伴隨着這些數據,您希望他們與(在你的情況self.vl)工作。如果您需要跨進程共享數據,則必須使用一些共享內存抽象,如multiprocessing.Value,當然應用互斥量。

UPDATE

我說你可以做到這一點(也有或多或少都在做它的模塊,檢查pathos.multiprocessing爲例),但我不認爲這是值得的麻煩 - 當你來到你必須欺騙你的系統做你想做的事情,有可能是你使用了錯誤的系統,或者你應該重新考慮你的設計。但是,對於信息性的緣故,這裏是做你想做的事,一個多設置一個辦法:

import sys 
from multiprocessing import Pool 

def parallel_call(params): # a helper for calling 'remote' instances 
    cls = getattr(sys.modules[__name__], params[0]) # get our class type 
    instance = cls.__new__(cls) # create a new instance without invoking __init__ 
    instance.__dict__ = params[1] # apply the passed state to the new instance 
    method = getattr(instance, params[2]) # get the requested method 
    args = params[3] if isinstance(params[3], (list, tuple)) else [params[3]] 
    return method(*args) # expand arguments, call our method and return the result 

class A(object): 

    def __init__(self, vl): 
     self.vl = vl 

    def cal(self, nb): 
     return nb * self.vl 

    def run(self, dt): 
     t = Pool(processes=4) 
     rs = t.map(parallel_call, self.prepare_call("cal", dt)) 
     t.close() 
     return rs 

    def prepare_call(self, name, args): # creates a 'remote call' package for each argument 
     for arg in args: 
      yield [self.__class__.__name__, self.__dict__, name, arg] 

if __name__ == "__main__": # important protection for cross-platform use 
    a = A(2) 
    print(a.run(list(range(10)))) 
    # prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] 

我認爲這是非常自我解釋它是如何工作的,但總之它通過你的類的名稱,其當前狀態(沒有信號,壽),被稱爲一個期望的方法和參數與一個parallel_call函數被稱爲在Pool每個過程調用它。蟒自動泡菜和unpickles所有這些數據,因此所有parallel_call需要做的是重建原始對象,找到在它的期望的方法和使用所提供的參數(或多個)調用它。

這樣,我們只傳遞數據而不嘗試傳遞活動對象,所以Python不會抱怨(在這種情況下,嘗試將實例方法的引用添加到類參數並查看會發生什麼情況)以及一切正常。

如果你想要沉迷於'魔法',你可以使它看起來完全像你的代碼(創建你自己的Pool處理程序,從函數中取出名稱並將名稱發送到實際進程等),但是這個應該爲您的示例提供足夠的功能。但是,在您提出自己的希望之前,請記住,只有在共享「靜態」實例(一旦您在多處理環境中調用該實例後,它不會更改其初始狀態的實例)時,它纔會起作用。如果A.cal方法要更改vl屬性的內部狀態 - 它只會影響其更改的實例(除非主要實例在調用之間調用Pool時發生更改)。如果你想分享狀態,你可以升級parallel_call在通話結束後拿起instance.__dict__,並將其與方法調用結果一起返回,然後在主叫方,你必須更新本地__dict__,返回的數據爲改變原來的狀態。這還不夠 - 你實際上必須創建一個共享字典並處理所有的互斥體工作人員讓它可以被所有進程同時訪問(你可以使用multiprocessing.Manager)。

所以,我剛纔說了,超過其價值的麻煩......

+0

所以在我的情況下,沒有解決方案,我們可以使用一個類方法在類中使用共享內存來垃圾多個進程,對吧?是否正確的方法只能在課堂之外帶來產卵方法? – Gotte

+0

有一個解決方案,我剛剛更新了我的答案。它很笨重,但並不是那麼好,但它的工作原理......然後,我會建議重新考慮你的設計,這樣你就不必處理共享實例的狀態。沒有理由讓自己比自己更難... – zwer

+0

謝謝你的幫助。我加了你的代碼,它運行完美,但我認爲我應該重新設計我的代碼,就像你的建議:) – Gotte

0

問題:似乎沒有工作(因爲我只有1個進程正在運行任務管理器中看到的) 並且結果從未交付。

只看到1過程Pool計算用於處理的數目如下:
你給range(10) =任務索引0..9,因此Pool計算(10/4) * 4 = 8+1 = 9
在開始第一個process之後,不再有任務了。
使用range(32),您將看到 process正在運行。

您將返回return t,而不是返回rs = pool.map(...的結果。


這將工作,例如

def cal(self, nb): 
    import os 
    print('pid:{} cal({})'.format(os.getpid(), nb)) 
    return nb * self.vl 

def run(self,df): 
    with mp.Pool(processes=4) as pool: 
     rs = pool.map(self.cal, df) 
    pool.close() 
    return rs 

if __name__ == '__main__': 
    a = A(2) 
    result = a.run(list(range(32))) 
    print(result) 

與Python測試:3.4.2