我正在使用pyspark來處理我的數據,最後我需要使用rdd.collect()從rdd收集數據。但是,由於內存問題,我的火花崩潰了。我嘗試了很多方法,但沒有運氣。我現在用下面的代碼,處理數據的每個分區一小塊運行:從spark rdd收集大型數據集的最佳做法是什麼?
def make_part_filter(index):
def part_filter(split_index, iterator):
if split_index == index:
for el in iterator:
yield el
return part_filter
for part_id in range(rdd.getNumPartitions()):
part_rdd = rdd.mapPartitionsWithIndex(make_part_filter(part_id), True)
myCollection = part_rdd.collect()
for row in myCollection:
#Do something with each row
我目前使用不會崩潰,但新的代碼似乎運行下去。
有沒有更好的方法來收集大型rdd的數據?
相反的運行對列表格式循環的RDD,爲什麼不運行map函數呢? –
實際上,我需要收集rdd中的所有數據並存儲在一個大型數組中,然後提供一個機器學習模塊。 – JamesLi
機器學習模塊接受一個迭代器,還是真的需要一個數組?使用迭代器可以避免一次將所有數據加載到內存中。即使如此,我還是擔心性能,因爲我假設機器學習模塊將用單線程「吃」數據。 –