2011-08-07 175 views
43

是否可以創建一個非守護程序的python Pool?我想要一個池可以調用內部有另一個池的函數。謝謝。Python進程池非守護進程?

+0

據我所知,沒有它不可能在池中的所有工人都進程化,它是不可能__inject依賴關係___,順便說一句,我不明白你的問題的第二部分「我希望一個池能夠調用一個內部具有另一個池的函數」以及這是如何幹擾工作者被守護進程的事實。 – mouad

+1

因爲如果函數a具有一個運行函數b的池,該函數具有運行函數c的池,則存在b問題,即它在守護進程中運行,並且守護進程無法創建進程。 'AssertionError:守護進程不允許有孩子' – Max

回答

68

multiprocessing.pool.Pool類在其__init__方法創建工作進程,使他們邪和啓動它們,並在開始之前(事後這是不允許的了)這是不可能給他們daemon屬性重新設置爲False。但是,您可以創建自己的子類multiprocesing.pool.Poolmultiprocessing.Pool只是一個包裝函數),並將替代自己的multiprocessing.Process子類(它始終是非守護進程的)用於工作進程。

下面是如何做到這一點的完整示例。最重要的部分是頂部的兩個類NoDaemonProcessMyPool,最後在MyPool實例上調用pool.close()pool.join()

#!/usr/bin/env python 
# -*- coding: UTF-8 -*- 

import multiprocessing 
# We must import this explicitly, it is not imported by the top-level 
# multiprocessing module. 
import multiprocessing.pool 
import time 

from random import randint 


class NoDaemonProcess(multiprocessing.Process): 
    # make 'daemon' attribute always return False 
    def _get_daemon(self): 
     return False 
    def _set_daemon(self, value): 
     pass 
    daemon = property(_get_daemon, _set_daemon) 

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool 
# because the latter is only a wrapper function, not a proper class. 
class MyPool(multiprocessing.pool.Pool): 
    Process = NoDaemonProcess 

def sleepawhile(t): 
    print("Sleeping %i seconds..." % t) 
    time.sleep(t) 
    return t 

def work(num_procs): 
    print("Creating %i (daemon) workers and jobs in child." % num_procs) 
    pool = multiprocessing.Pool(num_procs) 

    result = pool.map(sleepawhile, 
     [randint(1, 5) for x in range(num_procs)]) 

    # The following is not really needed, since the (daemon) workers of the 
    # child's pool are killed when the child is terminated, but it's good 
    # practice to cleanup after ourselves anyway. 
    pool.close() 
    pool.join() 
    return result 

def test(): 
    print("Creating 5 (non-daemon) workers and jobs in main process.") 
    pool = MyPool(5) 

    result = pool.map(work, [randint(1, 5) for x in range(5)]) 

    pool.close() 
    pool.join() 
    print(result) 

if __name__ == '__main__': 
    test() 
+0

上面的代碼似乎對我而言是懸而未決的。具體來說,它似乎掛在工作()內的pool.close()。有什麼我失蹤? – 2012-09-24 17:39:30

+1

我剛剛在Linux和Python 2.6/2.7/3.2 OS X上用Python 2.7/3.2(修復「打印」行後)再次測試我的代碼。OS X上的Linux和Python 2.7/3.2工作正常,但代碼確實掛起在OS X上使用Python 2.6(Lion)。這似乎是多處理模塊中的一個錯誤,它已得到修復,但我沒有真正檢查錯誤跟蹤器。 –

+0

這應該在多處理模塊中真正解決(應該提供非守護進程工作者的選項)。有誰知道誰維護它? –

6

multiprocessing模塊有一個很好的接口以使用與進程線程池。根據您當前的使用情況,您可能會考慮對外部池使用multiprocessing.pool.ThreadPool,這將導致線程(允許從內部產生進程)而不是進程。

它可能由GIL的限制,而在我的具體情況從外Pool(I測試的兩種),啓動時間的流程作爲創建here遠遠超過與ThreadPool該溶液中。


這真的很容易掉ProcessesThreads。詳細瞭解如何使用ThreadPool解決方案herehere

0

我遇到的問題是在嘗試導入模塊之間的全局變量時,導致ProcessPool()行被多次評估。

globals.py

from processing    import Manager, Lock 
from pathos.multiprocessing import ProcessPool 
from pathos.threading  import ThreadPool 

class SingletonMeta(type): 
    def __new__(cls, name, bases, dict): 
     dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self 
     return super(SingletonMeta, cls).__new__(cls, name, bases, dict) 

    def __init__(cls, name, bases, dict): 
     super(SingletonMeta, cls).__init__(name, bases, dict) 
     cls.instance = None 

    def __call__(cls,*args,**kw): 
     if cls.instance is None: 
      cls.instance = super(SingletonMeta, cls).__call__(*args, **kw) 
     return cls.instance 

    def __deepcopy__(self, item): 
     return item.__class__.instance 

class Globals(object): 
    __metaclass__ = SingletonMeta 
    """  
    This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children 

    The root cause is that importing this file from different modules causes this file to be reevalutated each time, 
    thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug  
    """ 
    def __init__(self): 
     print "%s::__init__()" % (self.__class__.__name__) 
     self.shared_manager  = Manager() 
     self.shared_process_pool = ProcessPool() 
     self.shared_thread_pool = ThreadPool() 
     self.shared_lock   = Lock()  # BUG: Windows: global name 'lock' is not defined | doesn't affect cygwin 

然後從其他地方安全地導入您的代碼

from globals import Globals 
Globals().shared_manager  
Globals().shared_process_pool 
Globals().shared_thread_pool 
Globals().shared_lock