2014-01-15 20 views
2

我使用python的多處理模塊在linux平臺上編寫了一個腳本。當我嘗試在Windows上運行該程序時,這不是直接工作,我發現這與Windows上生成子進程的事實有關。使用的物體可以被酸洗似乎是至關重要的。與大型數組的Windows上的Python多處理

我的主要問題是,我使用大型numpy數組。看起來,他們有一定的規模,他們不能再選擇。要打破它以一個簡單的腳本,我想要做這樣的事情:

### Import modules 

import numpy as np 
import multiprocessing as mp 

number_of_processes = 4 

if __name__ == '__main__': 

    def reverse_np_array(arr): 
     arr = arr + 1 
     return arr 

    a = np.ndarray((200,1024,1280),dtype=np.uint16) 

    def put_into_queue(_Queue,arr): 
     _Queue.put(reverse_np_array(arr)) 


    Queue_list = [] 
    Process_list = [] 
    list_of_arrays = [] 

    for i in range(number_of_processes): 
     Queue_list.append(mp.Queue()) 


    for i in range(number_of_processes): 
     Process_list.append(mp.Process(target=put_into_queue, args=(Queue_list[i],a))) 

    for i in range(number_of_processes): 
     Process_list[i].start() 

    for i in range(number_of_processes): 
     list_of_arrays.append(Queue_list[i].get()) 

    for i in range(number_of_processes): 
     Process_list[i].join() 

我收到以下錯誤信息:

Traceback (most recent call last): 
    File "Windows_multi.py", line 34, in <module> 
    Process_list[i].start() 
    File "C:\Program Files\Anaconda32\lib\multiprocessing\process.py", line 130, i 
n start 
    self._popen = Popen(self) 
    File "C:\Program Files\Anaconda32\lib\multiprocessing\forking.py", line 277, i 
n __init__ 
    dump(process_obj, to_child, HIGHEST_PROTOCOL) 
    File "C:\Program Files\Anaconda32\lib\multiprocessing\forking.py", line 199, i 
n dump 
    ForkingPickler(file, protocol).dump(obj) 
    File "C:\Program Files\Anaconda32\lib\pickle.py", line 224, in dump 
    self.save(obj) 
    File "C:\Program Files\Anaconda32\lib\pickle.py", line 331, in save 
    self.save_reduce(obj=obj, *rv) 
    File "C:\Program Files\Anaconda32\lib\pickle.py", line 419, in save_reduce 
    save(state) 
    File "C:\Program Files\Anaconda32\lib\pickle.py", line 286, in save 
    f(self, obj) # Call unbound method with explicit self 
    File "C:\Program Files\Anaconda32\lib\pickle.py", line 649, in save_dict 
    self._batch_setitems(obj.iteritems()) 

所以我基本上是建立一個大陣,我需要在所有進程都使用此數組進行計算並將其返回。

一個重要的事情似乎是寫的函數的定義,整個事情是工作的聲明if __name__ = '__main__':

之前,如果我減少陣列(50,1024,1280)。 但是,即使啓動了4個進程並且4個內核正在工作,它也比僅爲一個內核(在Windows上)不編寫代碼而沒有多處理代碼慢。所以我認爲我在這裏有另一個問題。

以後我真正的程序中的函數是在一個cython模塊中。

我使用python 32-bit的anaconda軟件包,因爲我無法使用64位版本編譯我的cython軟件包(我會在不同的線程中詢問這個問題)。

任何幫助,歡迎!

謝謝! 菲利普

UPDATE:

我做的第一個錯誤是在巡航能力了 「put_into_queue」 功能的__main__定義。

然後我按照建議引入了共享數組,但是,使用了大量的內存,並且使用的內存隨着我使用的進程而擴展(當然這不應該是這種情況)。 任何想法我在這裏做錯了嗎?在我放置共享數組的定義(在__main__之內或之外)似乎並不重要,但我認爲它應該在__main__之內。從這篇文章中得知:Is shared readonly data copied to different processes for Python multiprocessing?

import numpy as np 
import multiprocessing as mp 
import ctypes 


shared_array_base = mp.Array(ctypes.c_uint, 1280*1024*20) 
shared_array = np.ctypeslib.as_array(shared_array_base.get_obj()) 
#print shared_array 
shared_array = shared_array.reshape(20,1024,1280) 

number_of_processes = 4 

def put_into_queue(_Queue,arr): 
    _Queue.put(reverse_np_array(arr)) 
def reverse_np_array(arr): 
    arr = arr + 1 + np.random.rand() 
    return arr 
if __name__ == '__main__': 


    #print shared_arra 

    #a = np.ndarray((50,1024,1280),dtype=np.uint16) 


    Queue_list = [] 
    Process_list = [] 
    list_of_arrays = [] 

    for i in range(number_of_processes): 
     Queue_list.append(mp.Queue()) 


    for i in range(number_of_processes): 
     Process_list.append(mp.Process(target=put_into_queue, args=(Queue_list[i],shared_array))) 

    for i in range(number_of_processes): 
     Process_list[i].start() 

    for i in range(number_of_processes): 
     list_of_arrays.append(Queue_list[i].get()) 

    for i in range(number_of_processes): 
     Process_list[i].join() 
+0

這個問題的答案對你有幫助嗎? http://stackoverflow.com/a/14593135/513688這個想法是創建父母和孩子都可以寫入的共享數組,而不是使用酸洗。 – Andrew

+0

嗨,感謝您的答案,我試圖使用共享陣列,但它不工作,見上文。有誰知道爲什麼?乾杯 – Fips

+0

您正在將共享數組放入隊列中。鏈接的例子不這樣做。從一個工作示例開始,驗證它是否有效,並進行小的更改,直到它停止表現出您想要的/期望的行爲。 – Andrew

回答

0

您沒有包含完整的追溯;最終是最重要的。在我的32位的Python,我得到的是終於結束在

File "C:\Python27\lib\pickle.py", line 486, in save_string 
    self.write(BINSTRING + pack("<i", n) + obj) 
MemoryError 

MemoryError是例外一樣回溯,它說你跑出來的內存。

64位Python會解決這個問題,但在進程間發送大量數據很容易成爲multiprocessing中的一個嚴重瓶頸。

+0

感謝您的回覆! 是的,你是對的。但我現在怎麼解決這個問題呢?必須有一個優雅的方式來處理這個問題,我認爲人們處理更大的陣列。由於我只向陣列發送一次(向前和向後),所以它不會成爲我的情況下的瓶頸。 – Fips

+0

@Fips可能的解決方案是[在共享內存中使用numpy數組](http://stackoverflow.com/q/7894791/222914) –