2013-04-09 97 views
5

我不是python的專家,但我設法寫下了一個多處理代碼,它使用我的PC中所有的cpus和內核。我的代碼加載了一個非常大的數組,大約1.6 GB,並且我需要在每個進程中更新數組。幸運的是,更新包括爲圖像添加一些人造恆星,每個過程都有一組不同的圖像位置添加人造恆星。Python多處理和共享變量

圖像太大,每次調用某個進程時都無法創建新圖像。我的解決方案是在共享內存中創建一個變量,並節省大量內存。出於某種原因,它適用於圖像的90%,但是有些區域是我的代碼在我發送過程之前的某些位置添加了隨機數。它與我創建共享變量的方式有關嗎?這些進程在我的代碼執行過程中是否互相干擾?

奇怪的是,當使用一個單一的CPU和單核,圖像是100%完美,並沒有隨機數字添加到圖像。你是否建議我在多個進程之間共享一個大型數組?這裏是我的代碼的相關部分。請在我定義變量im_data時閱讀該行。

import warnings 
warnings.filterwarnings("ignore") 

from mpl_toolkits.mplot3d import Axes3D 
from matplotlib import cm 
import matplotlib.pyplot as plt 
import sys,os 
import subprocess 
import numpy as np 
import time 
import cv2 as cv 
import pyfits 
from pyfits import getheader 
import multiprocessing, Queue 
import ctypes 

class Worker(multiprocessing.Process): 


def __init__(self, work_queue, result_queue): 

    # base class initialization 
    multiprocessing.Process.__init__(self) 

    # job management stuff 
    self.work_queue = work_queue 
    self.result_queue = result_queue 
    self.kill_received = False 

def run(self): 
    while not self.kill_received: 

     # get a task 
     try: 
      i_range, psf_file = self.work_queue.get_nowait() 
     except Queue.Empty: 
      break 

     # the actual processing 
     print "Adding artificial stars - index range=", i_range 

     radius=16 
     x_c,y_c=((psf_size[1]-1)/2, (psf_size[2]-1)/2) 
     x,y=np.meshgrid(np.arange(psf_size[1])-x_c,np.arange(psf_size[2])-y_c) 
     distance = np.sqrt(x**2 + y**2) 

     for i in range(i_range[0],i_range[1]): 
      psf_xy=np.zeros(psf_size[1:3], dtype=float) 
      j=0 
      for i_order in range(psf_order+1): 
       j_order=0 
       while (i_order+j_order < psf_order+1): 
        psf_xy += psf_data[j,:,:] * ((mock_y[i]-psf_offset[1])/psf_scale[1])**i_order * ((mock_x[i]-psf_offset[0])/psf_scale[0])**j_order 
        j_order+=1 
        j+=1 


      psf_factor=10.**((30.-mock_mag[i])/2.5)/np.sum(psf_xy) 
      psf_xy *= psf_factor 

      npsf_xy=cv.resize(psf_xy,(npsf_size[0],npsf_size[1]),interpolation=cv.INTER_LANCZOS4) 
      npsf_factor=10.**((30.-mock_mag[i])/2.5)/np.sum(npsf_xy) 
      npsf_xy *= npsf_factor 

      im_rangex=[max(mock_x[i]-npsf_size[1]/2,0), min(mock_x[i]-npsf_size[1]/2+npsf_size[1], im_size[1])] 
      im_rangey=[max(mock_y[i]-npsf_size[0]/2,0), min(mock_y[i]-npsf_size[0]/2+npsf_size[0], im_size[0])] 
      npsf_rangex=[max(-1*(mock_x[i]-npsf_size[1]/2),0), min(-1*(mock_x[i]-npsf_size[1]/2-im_size[1]),npsf_size[1])] 
      npsf_rangey=[max(-1*(mock_y[i]-npsf_size[0]/2),0), min(-1*(mock_y[i]-npsf_size[0]/2-im_size[0]),npsf_size[0])] 

      im_data[im_rangey[0]:im_rangey[1], im_rangex[0]:im_rangex[1]] = 10. 


     self.result_queue.put(id) 

if __name__ == "__main__": 

    n_cpu=2 
    n_core=6 
    n_processes=n_cpu*n_core*1 
    input_mock_file=sys.argv[1] 

    print "Reading file ", im_file[i] 
    hdu=pyfits.open(im_file[i]) 
    data=hdu[0].data 
    im_size=data.shape 

    im_data_base = multiprocessing.Array(ctypes.c_float, im_size[0]*im_size[1]) 
    im_data = np.ctypeslib.as_array(im_data_base.get_obj()) 
    im_data = im_data.reshape(im_size[0], im_size[1]) 
    im_data[:] = data 
    data=0 
    assert im_data.base.base is im_data_base.get_obj() 

    # run 
    # load up work queue 
    tic=time.time() 
    j_step=np.int(np.ceil(mock_n*1./n_processes)) 
    j_range=range(0,mock_n,j_step) 
    j_range.append(mock_n) 


    work_queue = multiprocessing.Queue() 
    for j in range(np.size(j_range)-1): 
    if work_queue.full(): 
     print "Oh no! Queue is full after only %d iterations" % j 
    work_queue.put((j_range[j:j+2], psf_file[i])) 

    # create a queue to pass to workers to store the results 
    result_queue = multiprocessing.Queue() 

    # spawn workers 
    for j in range(n_processes): 
    worker = Worker(work_queue, result_queue) 
    worker.start() 

    # collect the results off the queue 
    while not work_queue.empty(): 
    result_queue.get() 

    print "Writing file ", mock_im_file[i] 
    hdu[0].data=im_data 
    hdu.writeto(mock_im_file[i]) 
    print "%f s for parallel computation." % (time.time() - tic) 
+1

而不是共享大型陣列你不能把它分成更小的子陣列和發送這些子陣列到子進程?然後將結果合併回原始數組。 – freakish 2013-04-09 13:53:53

+0

也可以考慮使用不同於Python的東西來處理這些巨大的圖像(C插件?)。 – freakish 2013-04-09 14:17:49

回答

3

我認爲這個問題(如你在你的問題建議的話)來自於事實,你是從多個線程在同一陣列中寫作

im_data_base = multiprocessing.Array(ctypes.c_float, im_size[0]*im_size[1]) 
im_data = np.ctypeslib.as_array(im_data_base.get_obj()) 
im_data = im_data.reshape(im_size[0], im_size[1]) 
im_data[:] = data 

雖然我敢肯定,你可以寫成im_data_base在「過程安全」的方式(一個隱含的鎖使用Python來同步訪問數組),我不知道,你可以寫成im_data以過程安全的方式。

因此,我要(儘管我不知道我會解決你的問題),建議你創建一個圍繞im_data

# Disable python implicit lock, we are going to use our own 
im_data_base = multiprocessing.Array(ctypes.c_float, im_size[0]*im_size[1], 
    lock=False) 
im_data = np.ctypeslib.as_array(im_data_base.get_obj()) 
im_data = im_data.reshape(im_size[0], im_size[1]) 
im_data[:] = data 
# Create our own lock 
im_data_lock = Lock() 

然後在過程中的明確鎖,每次需要時獲取鎖修改im_data

self.im_data_lock.acquire() 
im_data[im_rangey[0]:im_rangey[1], im_rangex[0]:im_rangex[1]] = 10 
self.im_data_lock.release() 

予省略了代碼到鎖傳遞到您的過程的構造器,並將其存儲作爲成員字段(self.im_data_lock)爲求簡潔。您還應該將im_data數組傳遞給流程的構造函數,並將其作爲成員字段存儲。

1

在您的示例中,當多個線程寫入圖像/數組中的重疊區域時,會出現問題。所以你必須爲每個鏡像放置一個鎖,或者爲每個鏡像部分創建一組鎖(以減少鎖爭用)。

或者您可以在一組進程中生成圖像修改,並在單獨的單個線程中對圖像進行實際修改。