2017-04-10 31 views
1

我需要處理超過1000萬個光譜數據集。數據結構如下:大約有1000個.fits(.fits是一些數據存儲格式)文件,每個文件包含大約600-1000個光譜,其中每個光譜中有大約4500個元素(因此每個文件返回一個〜1000 * 4500矩陣)。這意味着如果我要遍歷1000萬個條目,每個譜圖將被重複讀取大約10次(或者每個文件將被重複讀取大約10,000次)。雖然相同的光譜重複讀取10次左右,但它不是重複的,因爲每次我提取相同光譜的不同片段。在@Paul Panzer的幫助下,我已經避免多次閱讀同一個文件。實施多處理以處理HPC上的大量輸入/輸出

我有一個包含我需要的所有信息,如座標xy,半徑r,實力s等目錄還包含了信息的目標,我要讀的文件(目錄文件由n1,n2確定)以及我要使用的文件中的哪些光譜(由n3標識)。

我現在的代碼是:

import numpy as np 
from itertools import izip 
import itertools 
import fitsio 

x = [] 
y = [] 
r = [] 
s = [] 
n1 = [] 
n2 = [] 
n3 = [] 
with open('spectra_ID.dat') as file_ID, open('catalog.txt') as file_c: 
    for line1, line2 in izip(file_ID,file_c): 
     parts1 = line1.split() 
     parts2 = line2.split() 
     n1.append(int(parts1[0])) 
     n2.append(int(parts1[1])) 
     n3.append(int(parts1[2])) 
     x.append(float(parts2[0]))   
     y.append(float(parts2[1]))   
     r.append(float(parts2[2])) 
     s.append(float(parts2[3])) 

def data_analysis(n_galaxies): 
    n_num = 0 
    data = np.zeros((n_galaxies), dtype=[('spec','f4',(200)),('x','f8'),('y','f8'),('r','f8'),('s','f8')]) 

    idx = np.lexsort((n3,n2,n1)) 
    for kk,gg in itertools.groupby(zip(idx, n1[idx], n2[idx]), lambda x: x[1:]): 
     filename = "../../data/" + str(kk[0]) + "/spPlate-" + str(kk[0]) + "-" + str(kk[1]) + ".fits" 
     fits_spectra = fitsio.FITS(filename) 
     fluxx = fits_spectra[0].read() 
     n_element = fluxx.shape[1] 
     hdu = fits_spectra[0].read_header() 
     wave_start = hdu['CRVAL1'] 
     logwave = wave_start + 0.0001 * np.arange(n_element) 
     wavegrid = np.power(10,logwave) 

     for ss, plate1, mjd1 in gg: 
      if n_num % 1000000 == 0: 
       print n_num 
      n3new = n3[ss]-1 
      flux = fluxx[n3new] 
      ### following is my data reduction of individual spectra, I will skip here 
      ### After all my analysis, I have the data storage as below: 
      data['spec'][n_num] = flux_intplt 
      data['x'][n_num] = x[ss] 
      data['y'][n_num] = y[ss] 
      data['r'][n_num] = r[ss] 
      data['s'][n_num] = s[ss] 

      n_num += 1 

    print n_num 
    data_output = FITS('./analyzedDATA/data_ALL.fits','rw') 
    data_output.write(data) 

我有點明白多需要刪除一個循環,但指數傳遞給函數。但是,我的功能有兩個循環,而這兩個循環高度相關,所以我不知道如何處理。由於此代碼中最耗時的部分是從磁盤讀取文件,因此多處理需要充分利用內核一次讀取多個文件。任何人都可以點亮我嗎?

回答

0
  1. 擺脫global vars,您不能使用global varsprocesses
  2. 合併您的多個global vars一個容器類或快譯通, 分配相同的頻譜的不同細分成一個數據集
  3. 將您的global with open(...變成def ...
  4. 分開data_output分成自己def ...
  5. 嘗試第一,沒有multiprocessing,這個概念:

    for line1, line2 in izip(file_ID,file_c): 
        data_set = create data set from (line1, line2) 
        result = data_analysis(data_set) 
        data_output.write(data) 
    
  6. 考慮使用2 processes一個文件讀取,一個用於文件寫入。 使用multiprocessing.Pool(processes=n)data_analysis
    使用multiprocessing.Manager().Queue()

+0

你寫下你的代碼雖然processes之間的溝通?我無法清楚地理解它。 –

+0

從第2點開始,第3點,相應地在你的問題中編輯代碼。 – stovfl