2016-05-21 64 views
1

我正在使用pyspark來處理我的數據,最後我需要使用rdd.collect()從rdd收集數據。但是,由於內存問題,我的火花崩潰了。我嘗試了很多方法,但沒有運氣。我現在用下面的代碼,處理數據的每個分區一小塊運行:從spark rdd收集大型數據集的最佳做法是什麼?

def make_part_filter(index): 
    def part_filter(split_index, iterator): 
     if split_index == index: 
      for el in iterator: 
       yield el 
    return part_filter 


for part_id in range(rdd.getNumPartitions()): 
    part_rdd = rdd.mapPartitionsWithIndex(make_part_filter(part_id), True) 
    myCollection = part_rdd.collect() 
    for row in myCollection: 
      #Do something with each row 

我目前使用不會崩潰,但新的代碼似乎運行下去。

有沒有更好的方法來收集大型rdd的數據?

+1

相反的運行對列表格式循環的RDD,爲什麼不運行map函數呢? –

+0

實際上,我需要收集rdd中的所有數據並存儲在一個大型數組中,然後提供一個機器學習模塊。 – JamesLi

+1

機器學習模塊接受一個迭代器,還是真的需要一個數組?使用迭代器可以避免一次將所有數據加載到內存中。即使如此,我還是擔心性能,因爲我假設機器學習模塊將用單線程「吃」數據。 –

回答

1

試圖「收集」一個巨大的RDD是有問題的。 「收集」返回一個列表,這意味着整個RDD內容必須存儲在驅動程序的內存中。這是一個「showstopper」問題。通常情況下,人們希望Spark應用程序能夠處理大小超出單個節點內存容量的數據集。

讓我們假設RDD幾乎不適合內存,「收集」的作品。然後我們又有另一個「showstopper」---表現不佳。在您的代碼中,收集的RDD在循環中處理:「for myCollection中的行」。該循環由一個核心執行。因此,不是通過RDD處理數據,而是通過RDD進行處理,其計算分佈在集羣的所有核心中,其中可能有100個(如果不是1000個) - 而是整個數據集上的所有工作都放在單個核心。

0

我不知道這是否是最好的方式,但這是我嘗試過的最好的方式。不知道它是比你好還是差。同樣的想法,把它分成塊,但你可以更靈活的塊大小。

def rdd_iterate(rdd, chunk_size=1000000): 
    indexed_rows = rdd.zipWithIndex().cache() 
    count = indexed_rows.count() 
    print("Will iterate through RDD of count {}".format(count)) 
    start = 0 
    end = start + chunk_size 
    while start < count: 
     print("Grabbing new chunk: start = {}, end = {}".format(start, end)) 
     chunk = indexed_rows.filter(lambda r: r[1] >= start and r[1] < end).collect() 
     for row in chunk: 
      yield row[0] 
     start = end 
     end = start + chunk_size 

使用例子,我想一個巨大的RDD附加到磁盤上的一個CSV文件,而沒有填充與整個RDD Python列表:

def rdd_to_csv(fname, rdd): 
    import csv 
    f = open(fname, "a") 
    c = csv.writer(f) 
    for row in rdd_iterate(rdd): # with abstraction, iterates through entire RDD 
     c.writerows([row]) 
    f.close() 

rdd_to_csv("~/test.csv", my_really_big_rdd) 
相關問題