2016-03-02 23 views
0

我正在嘗試使用大量數據進行一些計算。計算由簡單的相關性組成,但是,我的數據量很大,我盯着計算機超過10分鐘,根本沒有輸出。在循環內使用Python池

然後我試着用multiprocessing.Pool。這是我現在的代碼:

from multiprocessing import Pool 
from haversine import haversine 

def calculateCorrelation(data_1, data_2, dist): 
    """ 
    Fill the correlation matrix between data_1 and data_2 
    :param data_1: dictionary {key : [coordinates]} 
    :param data_2: dictionary {key : [coordinates]} 
    :param dist: minimum distance between coordinates to be considered, in kilometers. 
    :return: numpy array containing the correlation between each complaint category. 
    """ 
    pool = Pool(processes=20) 

    data_1 = collections.OrderedDict(sorted(data_1.items())) 
    data_2 = collections.OrderedDict(sorted(data_2.items())) 
    data_1_size = len(data_1)           
    data_2_size = len(data_2) 

    corr = numpy.zeros((data_1_size, data_2_size)) 

    for index_1, key_1 in enumerate(data_1): 
     for index_2, key_2 in enumerate(data_2): # Forming pairs 
      type_1 = data_1[key_1] # List of data in data_1 of type *i* 
      type_2 = data_2[key_2] # List of data in data_2 of type *j* 
      result = pool.apply_async(correlation, args=[type_1, type_2, dist]) 
      corr[index_1, index_2] = result.get() 
    pool.close() 
    pool.join() 


def correlation(type_1, type_2, dist): 
    in_range = 0 
    for l1 in type_2:  # Coordinates of a data in data_1 
     for l2 in type_2: # Coordinates of a data in data_2 
      p1 = (float(l1[0]), float(l1[1])) 
      p2 = (float(l2[0]), float(l2[1])) 
      if haversine(p1, p2) <= dist: # Distance between two data of types *i* and *j* 
       in_range += 1    # Number of data in data_2 inside area of data in data_1 
     total = float(len(type_1) * len(type_2)) 
     if total != 0: 
      return in_range/total # Correlation between category *i* and *j* 

corr = calculateCorrelation(permiters_per_region, complaints_per_region, 20) 

但是,速度沒有提高。似乎沒有並行處理正在做:

enter image description here

由於只有一個線程集中幾乎所有的工作。在某些情況下,所有Python工作人員正在使用CPU的0.0%,並且一個線程正在使用100%。

我錯過了什麼嗎?

+0

'_2 = collections.OrderedDict(排序(data_1.items()))'是,應該是'data_2.items()' –

+0

好的,謝謝,@GarrettR! – pceccon

回答

3

在您生成作業的循環中,您調用apply_async,然後等待它完成,從而有效地序列化作業。您可以將結果對象添加到隊列中,並在完成所有調度工作後等待(請參見下文),甚至可以移至map方法。

def calculateCorrelation(data_1, data_2, dist): 
    """ 
    Fill the correlation matrix between data_1 and data_2 
    :param data_1: dictionary {key : [coordinates]} 
    :param data_2: dictionary {key : [coordinates]} 
    :param dist: minimum distance between coordinates to be considered, in kilometers. 
    :return: numpy array containing the correlation between each complaint category. 
    """ 
    pool = Pool(processes=20) 
    results = [] 

    data_1 = collections.OrderedDict(sorted(data_1.items())) 
    data_2 = collections.OrderedDict(sorted(data_2.items())) 
    data_1_size = len(data_1)           
    data_2_size = len(data_2) 

    corr = numpy.zeros((data_1_size, data_2_size)) 

    for index_1, key_1 in enumerate(data_1): 
     for index_2, key_2 in enumerate(data_2): # Forming pairs 
      type_1 = data_1[key_1] # List of data in data_1 of type *i* 
      type_2 = data_2[key_2] # List of data in data_2 of type *j* 
      result = pool.apply_async(correlation, args=[type_1, type_2, dist]) 
      results.append((result, index_1, index_2)) 
    for result, index_1, index_2 in results: 
     corr[index_1, index_2] = result.get() 
    pool.close() 
    pool.join() 
+0

當你提到「等待它完成」是'result.get()'調用?這是一個阻止電話嗎? –

+1

是的!那就是問題所在。 'result.get()'阻塞直到作業完成,然後返回結果。 – tdelaney

+0

https://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.AsyncResult.get它是暗示的,因爲文檔只是說:「返回結果到達時,如果超時不是無和如果遠程調用引發了異常,那麼該異常將被get()重新調整。「我只是好奇 –