我使用python的多處理模塊在linux平臺上編寫了一個腳本。當我嘗試在Windows上運行該程序時,這不是直接工作,我發現這與Windows上生成子進程的事實有關。使用的物體可以被酸洗似乎是至關重要的。與大型數組的Windows上的Python多處理
我的主要問題是,我使用大型numpy數組。看起來,他們有一定的規模,他們不能再選擇。要打破它以一個簡單的腳本,我想要做這樣的事情:
### Import modules
import numpy as np
import multiprocessing as mp
number_of_processes = 4
if __name__ == '__main__':
def reverse_np_array(arr):
arr = arr + 1
return arr
a = np.ndarray((200,1024,1280),dtype=np.uint16)
def put_into_queue(_Queue,arr):
_Queue.put(reverse_np_array(arr))
Queue_list = []
Process_list = []
list_of_arrays = []
for i in range(number_of_processes):
Queue_list.append(mp.Queue())
for i in range(number_of_processes):
Process_list.append(mp.Process(target=put_into_queue, args=(Queue_list[i],a)))
for i in range(number_of_processes):
Process_list[i].start()
for i in range(number_of_processes):
list_of_arrays.append(Queue_list[i].get())
for i in range(number_of_processes):
Process_list[i].join()
我收到以下錯誤信息:
Traceback (most recent call last):
File "Windows_multi.py", line 34, in <module>
Process_list[i].start()
File "C:\Program Files\Anaconda32\lib\multiprocessing\process.py", line 130, i
n start
self._popen = Popen(self)
File "C:\Program Files\Anaconda32\lib\multiprocessing\forking.py", line 277, i
n __init__
dump(process_obj, to_child, HIGHEST_PROTOCOL)
File "C:\Program Files\Anaconda32\lib\multiprocessing\forking.py", line 199, i
n dump
ForkingPickler(file, protocol).dump(obj)
File "C:\Program Files\Anaconda32\lib\pickle.py", line 224, in dump
self.save(obj)
File "C:\Program Files\Anaconda32\lib\pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "C:\Program Files\Anaconda32\lib\pickle.py", line 419, in save_reduce
save(state)
File "C:\Program Files\Anaconda32\lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "C:\Program Files\Anaconda32\lib\pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
所以我基本上是建立一個大陣,我需要在所有進程都使用此數組進行計算並將其返回。
一個重要的事情似乎是寫的函數的定義,整個事情是工作的聲明if __name__ = '__main__':
之前,如果我減少陣列(50,1024,1280)。 但是,即使啓動了4個進程並且4個內核正在工作,它也比僅爲一個內核(在Windows上)不編寫代碼而沒有多處理代碼慢。所以我認爲我在這裏有另一個問題。
以後我真正的程序中的函數是在一個cython模塊中。
我使用python 32-bit的anaconda軟件包,因爲我無法使用64位版本編譯我的cython軟件包(我會在不同的線程中詢問這個問題)。
任何幫助,歡迎!
謝謝! 菲利普
UPDATE:
我做的第一個錯誤是在巡航能力了 「put_into_queue」 功能的__main__
定義。
然後我按照建議引入了共享數組,但是,使用了大量的內存,並且使用的內存隨着我使用的進程而擴展(當然這不應該是這種情況)。 任何想法我在這裏做錯了嗎?在我放置共享數組的定義(在__main__
之內或之外)似乎並不重要,但我認爲它應該在__main__
之內。從這篇文章中得知:Is shared readonly data copied to different processes for Python multiprocessing?
import numpy as np
import multiprocessing as mp
import ctypes
shared_array_base = mp.Array(ctypes.c_uint, 1280*1024*20)
shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
#print shared_array
shared_array = shared_array.reshape(20,1024,1280)
number_of_processes = 4
def put_into_queue(_Queue,arr):
_Queue.put(reverse_np_array(arr))
def reverse_np_array(arr):
arr = arr + 1 + np.random.rand()
return arr
if __name__ == '__main__':
#print shared_arra
#a = np.ndarray((50,1024,1280),dtype=np.uint16)
Queue_list = []
Process_list = []
list_of_arrays = []
for i in range(number_of_processes):
Queue_list.append(mp.Queue())
for i in range(number_of_processes):
Process_list.append(mp.Process(target=put_into_queue, args=(Queue_list[i],shared_array)))
for i in range(number_of_processes):
Process_list[i].start()
for i in range(number_of_processes):
list_of_arrays.append(Queue_list[i].get())
for i in range(number_of_processes):
Process_list[i].join()
這個問題的答案對你有幫助嗎? http://stackoverflow.com/a/14593135/513688這個想法是創建父母和孩子都可以寫入的共享數組,而不是使用酸洗。 – Andrew
嗨,感謝您的答案,我試圖使用共享陣列,但它不工作,見上文。有誰知道爲什麼?乾杯 – Fips
您正在將共享數組放入隊列中。鏈接的例子不這樣做。從一個工作示例開始,驗證它是否有效,並進行小的更改,直到它停止表現出您想要的/期望的行爲。 – Andrew