2013-04-07 117 views
10

我有一個自定義對象的大型數組,我需要執行獨立(可並行)的任務,包括修改對象參數。我試過使用管理器().dict和'sharedmem'ory,但都沒有工作。例如:在python多處理中修改對象

import numpy as np 
import multiprocessing as mp 
import sharedmem as shm 


class Tester: 

    num = 0.0 
    name = 'none' 
    def __init__(self,tnum=num, tname=name): 
     self.num = tnum 
     self.name = tname 

    def __str__(self): 
     return '%f %s' % (self.num, self.name) 

def mod(test, nn): 
    test.num = np.random.randn() 
    test.name = nn 


if __name__ == '__main__': 

    num = 10 

    tests = np.empty(num, dtype=object) 
    for it in range(num): 
     tests[it] = Tester(tnum=it*1.0) 

    sh_tests = shm.empty(num, dtype=object) 
    for it in range(num): 
     sh_tests[it] = tests[it] 
     print sh_tests[it] 

    print '\n' 
    workers = [ mp.Process(target=mod, args=(test, 'some')) for test in sh_tests ] 

    for work in workers: work.start() 

    for work in workers: work.join() 

    for test in sh_tests: print test 

打印出:

0.000000 none 
1.000000 none 
2.000000 none 
3.000000 none 
4.000000 none 
5.000000 none 
6.000000 none 
7.000000 none 
8.000000 none 
9.000000 none 


0.000000 none 
1.000000 none 
2.000000 none 
3.000000 none 
4.000000 none 
5.000000 none 
6.000000 none 
7.000000 none 
8.000000 none 
9.000000 none 

即對象沒有被修改。

我該如何達到理想的行爲?

+0

的http://計算器。com/questions/10721915/shared-memory-objects-in-python-multiprocessing – tacaswell 2013-04-07 03:24:52

+0

你能發佈一個鏈接到'sharedmem'我似乎無法找到任何東西。 – tacaswell 2013-04-07 03:25:12

回答

8

問題是,當對象被傳遞給工作進程時,它們被包裝在泡菜中,運到另一個進程中,在那裏它們被解壓縮並進行工作。你的物體並沒有像克隆的那樣傳遞給另一個過程。您不會返回對象,因此克隆的對象會被愉快地修改,然後被丟棄。

看起來好像不能直接完成(Python: Possible to share in-memory data between 2 separate processes)。

你可以做的是返回修改的對象。

import numpy as np 
import multiprocessing as mp 



class Tester: 

    num = 0.0 
    name = 'none' 
    def __init__(self,tnum=num, tname=name): 
     self.num = tnum 
     self.name = tname 

    def __str__(self): 
     return '%f %s' % (self.num, self.name) 

def mod(test, nn, out_queue): 
    print test.num 
    test.num = np.random.randn() 
    print test.num 
    test.name = nn 
    out_queue.put(test) 




if __name__ == '__main__':  
    num = 10 
    out_queue = mp.Queue() 
    tests = np.empty(num, dtype=object) 
    for it in range(num): 
     tests[it] = Tester(tnum=it*1.0) 


    print '\n' 
    workers = [ mp.Process(target=mod, args=(test, 'some', out_queue)) for test in tests ] 

    for work in workers: work.start() 

    for work in workers: work.join() 

    res_lst = [] 
    for j in range(len(workers)): 
     res_lst.append(out_queue.get()) 

    for test in res_lst: print test 

這確實會導致有趣的觀察,由於產生了過程是相同的,他們都用隨機數相同的種子開始,所以他們所有產生相同的「隨機」數。

+0

行''工人= [mp.Process(...''看起來像你開始''num''進程(所有在同一時間?)在你的例子中,它只有十個,但你會怎麼樣將其應用於包含數千或數百萬條目(因此也包含工作人員)的大型數組? – 2017-09-16 13:44:31

3

我沒有看到你將shm引用傳遞給子進程,所以我沒有看到他們如何完成工作可以寫回共享內存。也許我在這裏錯過了一些東西。

或者,您是否考慮過numpy.memmap? (BTW:tcaswell,這裏提到的模塊似乎是:numpy-sharedmem)。

你也可能需要閱讀Sturla莫爾登的Using Python, multiprocessing and NumPy/SciPy for parallel numerical computing(PDF)在unutbu的回答建議[StackOverflow的:我如何通過蟒子流程之間的大numpy的陣列,而不保存到磁盤?]和(How do I pass large numpy arrays between python subprocesses without saving to disk?)。和喬金頓的StackOverflow: NumPy vs. multiprocessing and mmap

這些可能比直接相關更鼓舞人心。

+0

感謝您的指點。看到它,但不確定它是否是正確的包。 – tacaswell 2013-04-07 05:36:13

+0

+1爲好的鏈接集合! – tacaswell 2013-04-07 05:48:20

3

您的代碼不會嘗試修改共享內存。它只是克隆單個物體。

dtype=object意味着sharedmem將不起作用由於概述in the link provided by @tcaswell原因:

共享對象圖,其包括引用/指向其他對象的基本上是不可行的

對於純(值)您可以使用共享內存的類型,請參閱Use numpy array in shared memory for multiprocessing

manager方法也應該工作(它只是複製周圍的對象):

import random 
from multiprocessing import Pool, Manager 

class Tester(object): 
    def __init__(self, num=0.0, name='none'): 
     self.num = num 
     self.name = name 

    def __repr__(self): 
     return '%s(%r, %r)' % (self.__class__.__name__, self.num, self.name) 

def init(L): 
    global tests 
    tests = L 

def modify(i_t_nn): 
    i, t, nn = i_t_nn 
    t.num += random.normalvariate(mu=0, sigma=1) # modify private copy 
    t.name = nn 
    tests[i] = t # copy back 
    return i 

def main(): 
    num_processes = num = 10 #note: num_processes and num may differ 
    manager = Manager() 
    tests = manager.list([Tester(num=i) for i in range(num)]) 
    print(tests[:2]) 

    args = ((i, t, 'some') for i, t in enumerate(tests)) 
    pool = Pool(processes=num_processes, initializer=init, initargs=(tests,)) 
    for i in pool.imap_unordered(modify, args): 
     print("done %d" % i) 
    pool.close() 
    pool.join() 
    print(tests[:2]) 

if __name__ == '__main__': 
    main()