2011-12-15 94 views
10

我使用multiprocessing.Pool()multiprocessing.pool.map和功能兩個參數

這裏是我想池:

def insert_and_process(file_to_process,db): 
    db = DAL("path_to_mysql" + db) 
    #Table Definations 
    db.table.insert(**parse_file(file_to_process)) 
    return True 

if __name__=="__main__": 
    file_list=os.listdir(".") 
    P = Pool(processes=4) 
    P.map(insert_and_process,file_list,db) # here having problem. 

我想傳遞兩個參數 我想要做的是初始化只有4個DB連接(這裏將嘗試在每個函數調用上創建連接,因此可能會導致數百萬個連接並導致IO凍結死亡)。如果我可以創建4個數據庫連接和每個進程1,它會好的。

Pool有沒有解決方案?或者我應該放棄它?

編輯:

從雙方你的幫助,我通過這樣得到這個:

args=zip(f,cycle(dbs)) 
Out[-]: 
[('f1', 'db1'), 
('f2', 'db2'), 
('f3', 'db3'), 
('f4', 'db4'), 
('f5', 'db1'), 
('f6', 'db2'), 
('f7', 'db3'), 
('f8', 'db4'), 
('f9', 'db1'), 
('f10', 'db2'), 
('f11', 'db3'), 
('f12', 'db4')] 

所以在這裏它是如何要去工作,我該怎麼移動DB連接代碼到主水平這樣做:

def process_and_insert(args): 

    #Table Definations 
    args[1].table.insert(**parse_file(args[0])) 
    return True 

if __name__=="__main__": 
    file_list=os.listdir(".") 
    P = Pool(processes=4) 

    dbs = [DAL("path_to_mysql/database") for i in range(0,3)] 
    args=zip(file_list,cycle(dbs)) 
    P.map(insert_and_process,args) # here having problem. 

是的,我會測試它,讓你們知道。

回答

26

Pool文檔沒有的傳遞多個參數的目標函數的方式說 - 我已經試過只是路過的序列,但沒有得到展開(序列中的一個項目爲每個參數)。

然而,你可以寫你的目標函數期望第一個(也是唯一的)參數是一個元組,其中每個元素是你期望的參數之一:

from itertools import repeat 

def insert_and_process((file_to_process,db)): 
    db = DAL("path_to_mysql" + db) 
    #Table Definations 
    db.table.insert(**parse_file(file_to_process)) 
    return True 

if __name__=="__main__": 
    file_list=os.listdir(".") 
    P = Pool(processes=4) 
    P.map(insert_and_process,zip(file_list,repeat(db))) 

(請注意多括號中的insert_and_process - python把它當作一個單參數,應該是一個2項序列,序列的第一個元素歸屬於第一個變量,另一個歸入第二個變量)

8

您的池將產生四個進程,每個進程都由它自己的Python解釋器實例運行。您可以使用全局變量來保存你的數據庫連接對象,所以只有一個連接每個進程創建:

global_db = None 

def insert_and_process(file_to_process, db): 
    global global_db 
    if global_db is None: 
     # If this is the first time this function is called within this 
     # process, create a new connection. Otherwise, the global variable 
     # already holds a connection established by a former call. 
     global_db = DAL("path_to_mysql" + db) 
    global_db.table.insert(**parse_file(file_to_process)) 
    return True 

由於Pool.map()和朋友只支持一個參數的工人功能,你需要創建一個包裝轉發工作:

def insert_and_process_helper(args): 
    return insert_and_process(*args) 

if __name__ == "__main__": 
    file_list=os.listdir(".") 
    db = "wherever you get your db" 
    # Create argument tuples for each function call: 
    jobs = [(file, db) for file in file_list] 
    P = Pool(processes=4) 
    P.map(insert_and_process_helper, jobs) 
+0

由於費迪南德,這是接近我想要的。我想要做的是創建4個數據庫連接。每個進程有一個連接,但不是每個函數調用。 `DAL(「數據庫路徑」)將創建一個數據庫連接。一次連接將比四連接慢。 – 2011-12-16 09:30:10

+0

我試過的例子,當函數沒有返回它工作得很好...;我們不能做像my_var = P.map(insert_and_process_helper,jobs)的東西嗎? – neverMind 2013-08-07 05:27:24

5

不需要使用zip。例如,如果你有2個參數,x和y,和他們每個人可以得到一些值,比如:

X=range(1,6) 
Y=range(10) 

的功能應該得到只有一個參數,而裏面解壓:

def func(params): 
    (x,y)=params 
    ... 

你叫它這樣的:

params = [(x,y) for x in X for y in Y] 
pool.map(func, params) 
2

使用

params=[(x,y) for x in X for y in Y] 

您創建的xy的完整拷貝,並且可能比使用

from itertools import repeat 
P.map(insert_and_process,zip(file_list,repeat(db))) 
1

慢,您可以使用

from functools import partial 

庫用於這一目的

func = partial(rdc, lat, lng) 
r = pool.map(func, range(8)) 

def rdc(lat,lng,x): 
    pass