2016-10-04 35 views
3

我與pyspark 2.0,hadoop 2.7.2一起工作。 這裏是我的代碼:如何將RDD保存到單個實木複合地板文件?

def func(df): 
    new_df = pd.DataFrame(df['id']) 
    new_df['num'] = new_df['num'] * 12 
    return new_df 

set = sqlContext.read.parquet("data_set.parquet") 
columns = set.columns 
map_res = set.rdd.mapPartitions(lambda iter_: func(pd.DataFrame(list(iter_), 
                columns=columns))) 

現在,我需要map_res RDD保存爲一個文件拼花new.parquet。 有沒有什麼辦法可以在保存之前不創建大型數據框呢?或者可能會分別保存RDD的每個分區,然後合併所有保存的文件?

P.s.由於其真正的大尺寸,我想在不創建數據框的情況下進行管理。

+0

@santon看起來需要將所有單個數據框合併到保留模式的大數據框中。將它們保留爲RDD的元素將不允許像DataFrame一樣操作結果。 –

+0

@ИванСудос正確的,所以我不希望所有的數據被移動到一個節點 –

+0

@santon當你做管道單個parquet文件作爲參數更容易處理 –

回答

2

只有2種方法來做到這一點:

一種是使用"coalesce(1)" 這將確保所有的數據保存到1個文件,而不是多個文件(200火花默認不分區)使用dataframe.write.save("/this/is/path")

另一種選擇是將輸出寫入配置單元表,然後使用將被製表符分隔的hive -e "select * from table" > data.tsv

1

我的建議是:

dataframes = [] 
#creating index 
map_res = map_res.zipWithIndex() 
# setting index as key 
map_res = map_res.map(lambda x: (x[1],x[0])) 
# creating one spark df per element 
for i in range(0, map_res.count()): 
    partial_dataframe_pd = map_res.lookup(i) 
    partial_dataframe = sqlContext.createDataFrame(partial_dataframe_pd) 
    dataframes.append(partial_dataframe) 
# concatination 
result_df = dataframes.pop() 
for df in dataframes: 
    result_df.union(df) 
#saving 
result_df.write.parquet("...") 

如果你有小數目的分區(2-100),那麼它應該工作相當快。

相關問題