2017-04-17 101 views
0

我有幾個Excel文件,我需要加載它們並將其加載到Spark DF之前進行預處理。我有這些需要處理的文件的列表。我做這樣的事情在閱讀它們:如何將多個Pandas DF轉換爲單個Spark DF?

file_list_rdd = sc.emptyRDD() 

for file_path in file_list: 
    current_file_rdd = sc.binaryFiles(file_path) 
    print(current_file_rdd.count()) 
    file_list_rdd = file_list_rdd.union(current_file_rdd) 

我那麼有一些映射功能,輪流file_list_rdd從一組(路徑,字節)的元組(路徑,熊貓數據幀)的元組。這使我可以使用Pandas讀取Excel文件並操作這些文件,使它們在製作成Spark DataFrame之前是均勻的。

如何獲取(文件路徑,Pandas DF)元組的RDD並將其轉換爲單個Spark DF?我意識到可以進行單一轉換的函數,但不是可以執行幾次的函數。

我第一次嘗試這樣的:

sqlCtx = SQLContext(sc) 

def convert_pd_df_to_spark_df(item): 
    return sqlCtx.createDataFrame(item[0][1]) 

processed_excel_rdd.map(convert_pd_df_to_spark_df) 

我猜測,沒有因爲sqlCtx工作,不與分佈式計算(這是一個猜測,因爲堆棧跟蹤並沒有多大對我而言)。

提前感謝花時間閱讀:)。

+0

不幸的是我得到了數以千計的Excel文件。 – bstempi

回答

0

爲什麼不列出數據框或文件名,然後在循環中調用union。事情是這樣的:

如果大熊貓dataframes:

dfs = [df1, df2, df3, df4] 
sdf = None 
for df in dfs: 
    if sdf: 
     sdf = sdf.union(spark.createDataFrame(df)) 
    else: 
     sdf = spark.createDataFrame(df) 

如果文件名:

names = [name1, name2, name3, name4] 
sdf = None 
for name in names: 
    if sdf: 
     sdf = sdf.union(spark.createDataFrame(pd.read_excel(name)) 
    else: 
     sdf = spark.createDataFrame(pd.read_excel(name)) 
+0

如果我有成千上萬的Excel文件,這將會很慢。我的方法開始時的目標是試圖利用Spark的並行化。如果我可以在for循環中做到這一點,我根本不需要Spark。在我的情況下,幾個千兆字節的Excel數據。 – bstempi

+0

@ zero323在'processed_excel_rdd'上調用'flatMap(lambda x:x [1] .values)'給我一堆nd_array對象。在裝滿nd_array對象的RDD上調用'toDF'會導致堆棧跟蹤:https://pastebin.com/nX4gkXvg – bstempi

0

我解決了這個寫這樣的功能:

def pd_df_to_row(rdd_row): 
    key = rdd_row[0] 
    pd_df = rdd_row[1]   

    rows = list() 
    for index, series in pd_df.iterrows(): 
     # Takes a row of a df, exports it as a dict, and then passes an unpacked-dict into the Row constructor 

     row_dict = {str(k):v for k,v in series.to_dict().items()} 
     rows.append(Row(**row_dict)) 

    return rows 

你可以調用它通過調用類似於:

processed_excel_rdd = processed_excel_rdd.flatMap(pd_df_to_row) 

pd_df_to_row現在已收集Spark 對象。現在,您可以說:

processed_excel_rdd.toDF() 

有可能是一些比Series更有效 - >dict - > 操作,但是這讓我通過。

相關問題