2012-09-21 72 views
2

我想通過多個進程更新一個通用字典。你能幫我找出這個代碼有什麼問題嗎?我得到以下輸出:python多進程更新詞典同步

inside function 
{1: 1, 2: -1} 
comes here 
inside function 
{1: 0, 2: 2} 
comes here 
{1: 0, 2: -1} 

謝謝。

from multiprocessing import Lock, Process, Manager 

l= Lock() 


def computeCopyNum(test,val): 
    l.acquire() 
    test[val]=val 
    print "inside function" 
    print test 
    l.release() 
    return 

a=dict({1: 0, 2: -1}) 

procs=list() 

for i in range(1,3): 
    p = Process(target=computeCopyNum, args=(a,i)) 
    procs.append(p) 
    p.start() 

for p in procs: 
p.join() 
    print "comes here" 

print a 

回答

0

進程不像線程那樣共享內存。每個過程都以它自己的獨立副本結束。如果您想在不同的線程中工作,則需要使用管道或其他進程間通信來將數據返回到中央進程。

4

您導入Manager,但您對此不做任何處理。作爲第一種方法,而是執行此操作:

a = Manager().dict({1: 0, 2: -1}) 

全局變量是行不通的,你使用multiprocessing時所期望的方式。子進程只能訪問副本,並且它們所做的更改在它們退出時被遺忘,除非您使用的是專門設計的對象,能夠在進程之間傳播信息。

在進程間傳遞數據有多種不同的選擇,但使用上面的Manager對象通常是最簡單的。您還可以使用Manager對象來創建多個共享對象:

manager = Manager() 
a = manager.dict({1: 0, 2: -1}) 
b = manager.list((1, 2, 3)) 

Manager文檔的更多。

此外,您使用的鎖是不必要的。 Manager負責爲您服務。作爲docs說,

Generally synchronization primitives are not as necessary in a multiprocess program as they are in a multithreaded program.

+0

@ user1050325,你說你使用'Queue'得到了一個錯誤 - 你試過'Manager'方法嗎?對於基本的進程間通信,'Managers'完全沒問題。請讓我知道這對你有沒有用。 – senderle

+0

令人驚歎和簡單的解決方案。 –

7

答案其實很簡單。您正在使用多處理模塊,您可以使用它啓動幾個不同的python進程。不同的進程具有不同的地址空間,並且它們不共享內存,所以您的所有進程都會寫入其自己的本地字典副本。

使用多處理模塊進行進程間通信的最簡單方法是使用隊列在從進程和主進程之間進行通信。

from multiprocessing import Process, Queue 

def computeCopyNum(queue, val): 
    queue.put(val) # can also put a tuple of thread-id and value if we would like to 

procs=list() 

queue = Queue() 
for i in range(1,3): 
    p = Process(target=computeCopyNum, args=(queue, i)) 
    procs.append(p) 
    p.start() 

for _ in procs: 
    val = queue.get() 
    # do whatever with val 

for p in procs: 
    p.join() 

如果每個從屬進程可以生成它可能是明智的讓每個從屬進程一個定點值寫入隊列的信號到主,它的完成多個輸出值。然後代碼可能如下所示:

def slave(queue): 
    for i in range(128): # just for example 
     val = #some calculated result 
     queue.put(val) 

    queue.put(None) # add a sentinel value to tell the master we're done 

queue = Queue() 

# spawn 32 slave processes 
num_procs = 32 
procs = [Process(target=slave, args=(queue,)) for _ in range(num_procs)] 
for proc in procs: 
    proc.start() 

finished = 0 
while finished < num_procs: 
    item = queue.get() 
    if item is None: 
     finished += 1 
    else: 
     # do something with item 

for proc in procs: 
    proc.join() 

您也可以使用管理器,如另一個答案中所示。這種方法的問題在於進程地址空間之間可能會發生大量隱式內存複製,這很難推理。我總是喜歡使用顯式隊列。

+0

+1,關於內存複製的好處。不過,出於教學方面的原因,我認爲「經理人」對於初學者來說是一個更好的起點。 – senderle

+0

當我使用隊列時,出現以下錯誤:斷開__THE_PROCESS_HAS_FORKED_AND_YOU_CANNOT_USE_THIS_COREFOUNDATION_FUNCTIONALITY ___ YOU_MUST_EXEC __()進行調試。 該進程已分叉,並且無法安全使用此CoreFoundation功能。你必須執行()。 – user1050325

+0

你能發佈你的代碼嗎?我驗證了我發佈的代碼的工作,如果你只是複製第一個樣本逐字它應該工作。 – dnaq