2016-06-22 62 views
0

我有我設法paralelize代碼感謝給this question多重 - 限制CPU使用率

1| def function(name, params): 
2| results = fits.open(name) 
3| <do something more to results> 
4| return results 
5| 
6| def function_wrapper(args): 
7|  return function(*args) 
8| 
9| params = [...,...,..., etc]  
10| 
11| p = multiprocessing..Pool(processes=(max([2, mproc.cpu_count() // 10]))) 
12| args_generator = ((name, params) for name in names) 
13| 
14| dictionary = dict(zip(names, p.map(function_wrapper, args_generator))) 

如果我理解正確的是怎麼pool作品,在行指定的進程NUMER 應該是在給定時間產生的最大進程數。因此,這應該限制我的CPU使用率,對吧?我的意思是,按照第11行中設置的方式,我瞭解它,所使用的進程/ CPU的最大數量應該最大爲[2, number_of_cpus/10]

儘管如此,當我運行代碼時,我發現在啓動後不久,所有CPU都處於100%。我錯過了什麼嗎?

注意:對於上下文,我需要將CPU使用率限制爲最大數量的內核,因爲我將使用共享服務器。

UPDATE:添加我的代碼的修剪版本。我沒有打開fits文件,而是創建了一個類似於我的頻譜的嘈雜的高斯曲線(儘管表現更好......)。

修剪它有助於解決問題。內函數fnBootstrapInstance擬合是在一個二維陣列(基本上是一個階梯譜)上進行的,我使用for loop進行了迭代。出於某種原因,刪除循環,解決問題,只使用我指定的核心數量。我的猜測是,由於某種原因,for循環產生了一系列子過程(這就是它在htop上出現的過程)。迭代一次ecehelles譜圖解決了這個問題。

# Imports 
#%matplotlib inline 
import sys 
import numpy as np 
import matplotlib.pyplot as mplt 
import numpy.random as rnd 
import scipy.optimize as opt 
import multiprocessing as mproc 

# Functions ================================================== 
def fnBootstrapInstance(XXX = None, YYY= None, function= None, lenght=None, fitBounds= None, initParams=None, **kwargs): 

    # define samples 
    indexes = sorted(rnd.choice(range(len(XXX)), size=lenght, replace=True)) 
    samplesXXX = XXX[indexes] 
    samplesYYY = YYY[indexes] 

    fitBounds = ([-np.inf,-np.inf,0,-np.inf],[np.inf,np.inf,np.inf,np.inf]) 

    params, cov = opt.curve_fit(function, samplesXXX.ravel(), samplesYYY.ravel(), p0=initParams, 
           bounds = fitBounds, 
           ) 

    return params 

def wrapper_fnBootstrapInstance(args): 
    return fnBootstrapInstance(**args) 

def fnGaussian(dataXXX, Amp, mean, FWHM, B): 
    return B - Amp * np.exp(-4 * np.log(2) * (((dataXXX - mean)/FWHM) ** 2)) 
# Functions ================================================== 


# Noise Parameters 
arrLen = 1000 
noiseAmp = 0. 
noiseSTD = .25 

# Gaussian Data Parameters 
amp = 1. 
mean = 10 
FWHM = 30. 
B = 1. 

# generate random gauss data 
arrGaussXXX = np.linspace(-50, 60,num = arrLen) 
arrGaussNoise = rnd.normal(noiseAmp,noiseSTD, arrLen) 
arrGaussYYY = fnGaussian(arrGaussXXX, amp, mean, FWHM, B) + arrGaussNoise 

# multiprocessing bit 
numIterations = 1000 

mprocPool = mproc.Pool(processes=(max([2, mproc.cpu_count() // 10]))) 

initParams = [max(arrGaussYYY) - min(arrGaussYYY), np.median(arrGaussXXX), 
         max(arrGaussXXX) - min(arrGaussXXX), max(arrGaussYYY)] 

args_generator = [{'XXX':arrGaussXXX, 'YYY':arrGaussYYY, 'function':fnGaussian, 'initParams':initParams, 
        'lenght':200} for n in range(numIterations)] 

fitParams = [] 
for results in mprocPool.imap(wrapper_fnBootstrapInstance, args_generator): 
    fitParams.append([results[0],results[1],results[2],results[3]]) 



bootParams = [(np.nanmedian(param),np.nanstd(param)) for param in np.array(fitParams).T] 
print '\n'.join('{:.2f}+-{:.2f} ({:.1f}%)'.format(param[0],param[1], param[1]/param[0]*100) for param in bootParams) 

mplt.figure(figsize=(20,10)) 
mplt.plot(arrGaussXXX, arrGaussYYY,'+') 
for params in fitParams: 
    mplt.plot(arrGaussXXX,fnGaussian(arrGaussXXX,*params),'r', alpha = .5) 
mplt.show() 


mprocPool.close() 

謝謝大家!

+2

它的確應該(限制使用的CPU數量)。你的代碼示例甚至在語法上都不是有效的,但是更少運行,所以不可能說出什麼可能是錯誤的。 – torek

+0

'mproc.cpu_count()'的輸出是什麼? 'mproc.cpu_count()// 10'的輸出是什麼? – proinsias

+1

此代碼限制了您應該能夠獨立驗證的池中進程的數量(例如,Linux上的'ps x')。但是這也取決於這些流程的功能。如果他們也在拆分多進程或調用某些創建大量線程的東西(可能是'pandas'),那麼您仍然會使用所有的cpus。 – tdelaney

回答

1

考慮使用multiprocessing.pool.ThreadPool。它提供了與multiprocessing.Pool相同的API,但將工作負載抽象爲線程集合。請注意,如果您的CPU支持超線程,那麼它很可能會將工作負載分配給物理內核。

+0

謝謝,我會研究它。 – jorgehumberto