2016-08-22 42 views
0

我有一個需要高效解壓縮到字典的對象列表。列表中有超過2,000,000個對象。該操作完成超過1.5個小時。我想知道這是否可以更有效地完成。 列表中的對象基於此類。Python:將對象列表解壓到字典

class ResObj: 
def __init__(self, index, result): 
    self.loc = index ### This is the location, where the values should go in the final result dictionary 
    self.res = result ### This is a dictionary that has values for this location. 

    self.loc = 2 
    self.res = {'value1':5.4, 'value2':2.3, 'valuen':{'sub_value1':4.5, 'sub_value2':3.4, 'sub_value3':7.6}} 

目前我使用此方法來執行此操作。

def make_final_result(list_of_results): 
    no_sub_result_variables = ['value1', 'value2'] 
    sub_result_variables = ['valuen'] 
    sub_value_variables = ['sub_value1', 'sub_value3', 'sub_value3'] 

    final_result = {} 
    num_of_results = len(list_of_results) 
    for var in no_sub_result_variables: 
     final_result[var] = numpy.zeros(num_of_results) 
    for var in sub_result_variables: 
     final_result[var] = {sub_var:numpy.zeros(num_of_results) for sub_var in sub_value_variables} 

    for obj in list_of_results: 
     i = obj.loc 
     result = obj.res 
     for var in no_sub_result_variables: 
      final_result[var][i] = result[var] 
     for var in sub_result_variables: 
      for name in sub_value_variables: 
       try: 
        final_result[var][name][i] = result[var][name] 
       except KeyError as e: 
        ##TODO Add a debug check 
        pass 

我一直在使用multiprocessing.Manager()。字典和經理試圖()。數組()使用並行這一點,但是,我只能得到2個處理工作(儘管,我手動設置進程到#個CPU = 24)。 你能幫我使用更快的方法來提高性能。 謝謝。

+1

很多這個看起來不像是很好的使用數據結構或者特別有效的對象初始化給我,但是很難說出你從哪裏開始。你能舉例輸入和輸出嗎? –

+0

請發佈一個簡短的自包含的'.py'文件,它需要超過1.5小時才能在您的機器上運行:http://sscce.org/ – pts

+0

您似乎有嵌套循環:'for var in no_sub_result_variables:'和對於sub_result_variables中的var:'。你真的想在這裏有嵌套循環嗎?如果是,請將內部循環變量重命名爲「var2」以進行說明。 – pts

回答

0

刪除一些縮進,使您的循環非嵌套:

for obj in list_of_results: 
    i = obj.loc 
    result = obj.res 
    for var in no_sub_result_variables: 
     final_result[var][i] = result[var] 
    for var in sub_result_variables: 
     for name in sub_value_variables: 
      try: 
       final_result[var][name][i] = result[var][name] 
      except KeyError as e: 
       ##TODO Add a debug check 
       pass 
+0

謝謝。我已經編輯了這個問題。 – ssm

2

似乎有嵌套numpy的陣列無法構建您的數據的最佳方式。您可以使用numpy的structured arrays來創建更直觀的數據結構。

import numpy as np 

# example values 
values = [ 
    { 
     "v1": 0, 
     "v2": 1, 
     "vs": { 
      "x": 2, 
      "y": 3, 
      "z": 4, 
     } 
    }, 
    { 
     "v1": 5, 
     "v2": 6, 
     "vs": { 
      "x": 7, 
      "y": 8, 
      "z": 9, 
     } 
    } 
] 

def value_to_record(value): 
    """Take a dictionary and convert it to an array-like format""" 
    return (
     value["v1"], 
     value["v2"], 
     (
      value["vs"]["x"], 
      value["vs"]["y"], 
      value["vs"]["z"] 
     ) 
    ) 

# define what a record looks like -- f8 is an 8-byte float 
dtype = [ 
    ("v1", "f8"), 
    ("v2", "f8"), 
    ("vs", [ 
     ("x", "f8"), 
     ("y", "f8"), 
     ("z", "f8") 
    ]) 
]   

# create actual array 
arr = np.fromiter(map(value_to_record, values), dtype=dtype, count=len(values)) 

# access individual record 
print(arr[0]) # prints (0.0, 1.0, (2.0, 3.0, 4.0)) 
# access specific value 
assert arr[0]['vs']['x'] == 2 
# access all values of a specific field 
print(arr['v2']) # prints [ 1. 6.] 
assert arr['v2'].sum() == 7 

使用這種生成數據的方式,在我的機器上創建了一個2萬秒長的數組。

要使其適用於您的ResObj對象,請使用loc屬性對它們進行排序,然後將res屬性傳遞給value_to_record函數。

1

您可以通過鍵名在進程之間分配工作。
在這裏,我創建了一個工作者池,並向他們傳遞var和可選的子變量名稱。
巨大的數據集與工人共享使用便宜的fork
Unpacker.unpack從ResObj中選取指定的變量並將它們作爲np.array返回
make_final_result中的主循環組合了fi​​nal_result中的數組。
的Py2

from collections import defaultdict 
from multiprocessing import Process, Pool 
import numpy as np 

class ResObj(object): 
    def __init__(self, index=None, result=None): 
     self.loc = index ### This is the location, where the values should go in the final result dictionary 
     self.res = result ### This is a dictionary that has values for this location. 

     self.loc = 2 
     self.res = {'value1':5.4, 'value2':2.3, 'valuen':{'sub_value1':4.5, 'sub_value2':3.4, 'sub_value3':7.6}} 

class Unpacker(object): 
    @classmethod 
    def cls_init(cls, list_of_results): 
     cls.list_of_results = list_of_results 

    @classmethod 
    def unpack(cls, var, name): 

     list_of_results = cls.list_of_results 
     result = np.zeros(len(list_of_results)) 
     if name is None: 
      for i, it in enumerate(list_of_results): 
       result[i] = it.res[var] 
     else: 
      for i, it in enumerate(list_of_results): 
       result[i] = it.res[var][name] 
     return var, name, result 

#Pool.map doesn't accept instancemethods so the use of a wrapper 
def Unpacker_unpack((var, name),): 
    return Unpacker.unpack(var, name) 


def make_final_result(list_of_results): 
    no_sub_result_variables = ['value1', 'value2'] 
    sub_result_variables = ['valuen'] 
    sub_value_variables = ['sub_value1', 'sub_value3', 'sub_value3'] 

    pool = Pool(initializer=Unpacker.cls_init, initargs=(list_of_results,)) 
    final_result = defaultdict(dict) 

    def key_generator(): 
     for var in no_sub_result_variables: 
      yield var, None 
     for var in sub_result_variables: 
      for name in sub_value_variables: 
       yield var, name 

    for var, name, result in pool.imap(Unpacker_unpack, key_generator()): 
     if name is None: 
      final_result[var] = result 
     else: 
      final_result[var][name] = result 
    return final_result 

if __name__ == '__main__': 
    print make_final_result([ResObj() for x in xrange(10)]) 

確保你是不是在Windows上。它缺少fork,多處理將不得不將整個數據集管理到24個工作進程中的每一個。
希望這會有所幫助。