0

我有一個Class,它在狀態中存儲一個大型的數組numpy。這導致multiprocessing.Pool變得非常緩慢。這裏有一個MRE:狀態爲大型數組的多處理

from multiprocessing import Pool 
import numpy 
import time 
from tqdm import tqdm 

class MP(object): 
    def __init__(self, mat): 
     self.mat = mat 

    def foo(self, x): 
     time.sleep(1) 
     return x*x + self.mat.shape[0] 

    def bar(self, arr): 
     results = [] 
     with Pool() as p: 
      for x in tqdm(p.imap(self.foo, arr)): 
       results.append(x) 
     return results 

if __name__ == '__main__': 
    x = numpy.arange(8) 
    mat = numpy.random.random((1,1)) 
    h = MP(mat) 
    res = h.bar(x) 
    print(res) 

我已經有了CPU 4個內核,這意味着該代碼應該(和不)運行約2秒鐘。 (tqdm顯示2秒爲進度條,這個例子沒有必要)。但是,在主程序中,如果我做mat = numpy.random.random((10000,10000)),則需要永久運行。我懷疑這是因爲Pool正在爲每個工作人員複製mat,但我不確定這是如何工作的,因爲mat處於Class的狀態,並且不直接參與imap調用。所以,我的問題是:

  1. 爲什麼會發生這種情況? (即,Pool如何在一個類中工作?它到底具體是什麼?製作了哪些副本以及通過引用傳遞了什麼?)
  2. 什麼是可行的解決方法?

編輯:修改foo做出的mat使用,這更代表我真正的問題。

+0

你的主程序中有多大'x'? –

+1

在你的主程序中,你傳遞給'p.imap'的函數是否需要'MP'的方法,或者它可以是一個未綁定的函數? –

+0

@JeremyMcGibbon好點。我想我的例子並不能很好地表達我的真正問題。所以,函數確實需要是'MP'的方法,因爲函數實際上是從'mat'中讀取的。 – ved

回答

0

如果像你說mat不直接參與imap電話,我猜一般的MP狀態未在imap調用中使用(如果是這樣,下面的評論,我會刪除這個答案) 。如果是這種情況,則應該將foo寫爲未綁定函數,而不是MP的方法。 mat現在被複制的原因是因爲foo的每個執行都需要在self中傳遞,其中包含self.mat

下執行得很快,不管墊的大小:

from multiprocessing import Pool 
import numpy 
import time 
from tqdm import tqdm 


class MP(object): 

    def __init__(self, mat): 
     self.mat = mat 

    def bar(self, arr): 
     results = [] 
     with Pool() as p: 
      for x in tqdm(p.imap(foo, arr)): 
       results.append(x) 
     return results 

def foo(x): 
    time.sleep(1) 
    return x * x 

if __name__ == '__main__': 
    x = numpy.arange(8) 
    mat = numpy.random.random((10000, 10000)) 
    h = MP(mat) 
    res = h.bar(x) 
    print(res) 

如果foo實際上並需要傳遞MP,因爲它實際上並不需要從mat閱讀,那麼有沒有辦法避免發送mat到每個處理器,而你的問題2沒有答案,除了「你不能」。但希望我已經回答了你的問題1.