2016-09-15 37 views
1

以下不能使用python 2.7.9,但也不會拋出任何錯誤或異常。有沒有一個錯誤,或者不能在一個類中使用多處理?Python多處理沉默失敗類

from multiprocessing import Pool 

def testNonClass(arg): 
    print "running %s" % arg 
    return arg 

def nonClassCallback(result): 
    print "Got result %s" % result 

class Foo: 
    def __init__(self): 
     po = Pool() 
     for i in xrange(1, 3): 
      po.apply_async(self.det, (i,), callback=self.cb) 
     po.close() 
     po.join() 
     print "done with class" 
    po = Pool() 
    for i in xrange(1, 3): 
     po.apply_async(testNonClass, (i,), callback=nonClassCallback) 
    po.close() 
    po.join() 

    def cb(self, r): 
     print "callback with %s" % r 

    def det(self, M): 
     print "method" 
     return M+2 


if __name__ == "__main__": 
    Foo() 

運行打印此:

done with class 
running 1 
running 2 
Got result 1 
Got result 2 

編輯:這似乎是相關的,但它使用.map,而我特別感到需要使用apply_async這似乎在多與類的實例是如何工作方面關係(例如,我沒有picklnig錯誤,就像很多其他相關問題一樣) - Python how to do multiprocessing inside of a class?

+0

你爲什麼要創建兩個不同的池?在類塊中創建池可能不是一個好主意。你可以在__init__或__name__ ==「__main __」塊中完成。當你說「不行」時,你的意思是什麼? –

回答

0

我很確定它可以在課堂上使用,但您需要保護對的呼叫的條款內,如:

if name == "__main__":

,使其只能被調用在主線程。您可能還必須更改該類的__init__函數,以便它接受池作爲參數而不是創建池。

我只是嘗試這樣做

from multiprocessing import Pool 

#global counter 
#counter = 0 

class Foo: 
    def __init__(self, po): 
     for i in xrange(1, 300): 
      po.apply_async(self.det, (i,), callback=self.cb) 
     po.close() 
     po.join() 
     print("foo") 
     #print counter 

    def cb(self, r): 
     #global counter 
     #print counter, r 
     counter += 1 


    def det(self, M): 
     return M+2 


if __name__ == "__main__": 
    po = Pool() 
    Foo(po) 

,我想我知道是什麼問題了。 Python不是多線程的;全局解釋器鎖可以防止這一點。 Python正在使用多個進程,因此池中的子進程無法訪問主進程的標準輸出。

子流程也無法修改變量counter,因爲它存在於一個不同的進程中(我嘗試運行counter行註釋掉和未註釋)。現在,我記得看到全局狀態變量被池中的進程改變的情況,所以我不知道所有的細節。我知道一般來說,擁有這樣的全局狀態變量是一個糟糕的主意,如果沒有其他原因,它們可能導致競爭條件和/或浪費時間鎖定並等待訪問全局變量。

+0

好趕上,但結果是即使這與應用 – Hamy

+1

謝謝。我刪除了計數器並添加了一個例子,證明子進程的輸出不會導致失敗。問題仍然存在 - 它使用導致此故障的類方法 – Hamy

1

默認情況下,進程不共享狀態或內存,每個進程都是一個獨立的程序。您需要1)使用線程2)使用specific types capable of sharing state或3)設計您的程序以避免共享狀態,而是依賴返回值。

更新

在代碼中有兩個問題,一個是屏蔽另一個問題。

1)您不會對apply_async的結果做任何事情,我看到您使用的是回調函數,但仍需要捕獲結果並處理它們。因爲你沒有這樣做,所以你沒有看到由第二個問題引起的錯誤。

2)一個對象的方法不能傳遞給其他進程......當我第一次發現它時,我真的非常惱火,但是有一個簡單的解決方法。試試這個:

from multiprocessing import Pool 

def _remote_det(foo, m): 
    return foo.det(m) 

class Foo: 
    def __init__(self): 
     pass 
     po = Pool() 
     results = [] 
     for i in xrange(1, 3): 
      r = po.apply_async(_remote_det, (self, i,), callback=self.cb) 
      results.append(r) 

     po.close() 
     for r in results: 
      r.wait() 
      if not r.successful(): 
       # Raises an error when not successful 
       r.get() 

     po.join() 
     print "done with class" 

    def cb(self, r): 
     print "callback with %s" % r 

    def det(self, M): 
     print "method" 
     return M+2 

if __name__ == "__main__": 
    Foo() 
+0

抱歉,狀態共享不是問題(雖然在我的示例中它是一個錯誤)。更新的questino顯示相同的問題,沒有使用任何全球計數器 – Hamy

+0

@Hamy,看我的更新。 –