2012-04-12 29 views
31

我正在嘗試使用多進程池對象。我希望每個進程在啓動時都打開數據庫連接,然後使用該連接來處理傳入的數據(而不是打開和關閉每個數據位的連接)。這似乎是初始化程序的作用因爲,但我無法圍繞工作人員和初始化程序如何溝通。所以我有這樣的事情:如何使用初始化設置我的多進程池?

def get_cursor(): 
    return psycopg2.connect(...).cursor() 

def process_data(data): 
    # here I'd like to have the cursor so that I can do things with the data 

if __name__ == "__main__": 
    pool = Pool(initializer=get_cursor, initargs=()) 
    pool.map(process_data, get_some_data_iterator()) 

我怎麼(或我)從get_cursor()返回光標到process_data()?

回答

63

初始化功能是因此被稱爲:

def worker(...): 
    ... 
    if initializer is not None: 
     initializer(*args) 

所以保存任何地方沒有返回值。你可能認爲這會讓你失望,但不是!每個工人都在一個獨立的過程中。因此,您可以使用普通的global變量。

這不完全是漂亮,但它的工作原理:

cursor = None 
def set_global_cursor(...): 
    global cursor 
    cursor = ... 

現在你可以在你的process_data功能只需使用cursor。每個獨立進程中的變量cursor與所有其他進程是分開的,因此它們不會互相踩在一起。

(我不知道psycopg2是否有不同的方式來解決這個問題不涉及在第一時間使用multiprocessing;這意味着作爲一個籠統的回答與multiprocessing模塊的一個普遍問題)

+8

這應該是接受的答案 – thias 2014-01-15 14:12:32

+0

@torek應該在init_worker中調用set_global_cursor嗎? – 2015-06-13 07:06:51

+0

@TheUnfunCat:不知道'init_worker'是什麼(我在你的答案中看到一個,但在原始問題中沒有)我不能確定地說。總的想法是允許'多進程。Pool「創建一個進程池,並讓每個進程創建(它自己的私有副本)數據庫連接。如果你想在池進程啓動時發生這種情況,你可以使用初始化函數。如果您希望稍後發生,可以稍後再做。無論哪種方式,你需要一個持久變量,就像你的方法中的'function.cursor'或普通的'global'一樣。 – torek 2015-06-14 00:40:39

4

您也可以將函數一起發送到初始化程序並在其中創建一個連接。之後,將光標添加到該功能。

def init_worker(function): 
    function.cursor = db.conn() 

現在,您可以通過function.cursor訪問數據庫而無需使用全局變量。

+1

你的進程命令是這樣的: p = Pool(initializer = init_worker,args =(func)); p.map(func,args_set); ?? – 2015-07-31 13:32:14

+0

是的,類似的東西(我記得這個工作,但一段時間沒有在相關的東西工作,所以不記得確切的細節,隨時dv或修改我的答案) – 2015-07-31 16:49:12

7

torek已經給出了一個很好的解釋,爲什麼初始化器在這種情況下不起作用。然而,我個人並不喜歡全局變量,所以我想在這裏粘貼另一個解決方案。

想法是使用一個類來包裝函數並用「全局」變量初始化類。

class Processor(object): 
    """Process the data and save it to database.""" 

    def __init__(self, credentials): 
    """Initialize the class with 'global' variables""" 
    self.cursor = psycopg2.connect(credentials).cursor() 

    def __call__(self, data): 
    """Do something with the cursor and data""" 
    self.cursor.find(data.key) 

然後用

p = Pool(5) 
p.map(Processor(credentials), list_of_data) 

調用,這樣第一個參數初始化類證書,返回類的實例,並調用地圖數據的實例。

儘管這不像全局變量解決方案那麼直截了當,但我強烈建議避免全局變量並以某種安全方式封裝變量。 (我真的希望他們可以在一天內支持lambda表達式,這會讓事情變得簡單...)

+0

我喜歡這個答案,因爲它很漂亮,但它不會重新連接列表中的每個項目? – woot 2016-05-27 06:42:07

+6

它通常很好地避免了全局變量,你可以做這樣的事情,但是你會推遲初始化'self.cursor',直到'p.map'實際上已經啓動了流程實例。也就是說,你的'__init__'只會設置爲'None','__call__'會說'如果self.cursor是None:self.cursor = ...'。最後,我們真正需要的是一個單進程單例。 – torek 2016-07-23 08:13:02

+0

您也可以嘗試使用帶有線程/進程id的映射作爲關鍵字,以便完全隔離每個線程/進程的連接。 – 2017-03-07 16:35:37

相關問題