我有幾個Excel文件,我需要加載它們並將其加載到Spark DF之前進行預處理。我有這些需要處理的文件的列表。我做這樣的事情在閱讀它們:如何將多個Pandas DF轉換爲單個Spark DF?
file_list_rdd = sc.emptyRDD()
for file_path in file_list:
current_file_rdd = sc.binaryFiles(file_path)
print(current_file_rdd.count())
file_list_rdd = file_list_rdd.union(current_file_rdd)
我那麼有一些映射功能,輪流file_list_rdd
從一組(路徑,字節)的元組(路徑,熊貓數據幀)的元組。這使我可以使用Pandas讀取Excel文件並操作這些文件,使它們在製作成Spark DataFrame之前是均勻的。
如何獲取(文件路徑,Pandas DF)元組的RDD並將其轉換爲單個Spark DF?我意識到可以進行單一轉換的函數,但不是可以執行幾次的函數。
我第一次嘗試這樣的:
sqlCtx = SQLContext(sc)
def convert_pd_df_to_spark_df(item):
return sqlCtx.createDataFrame(item[0][1])
processed_excel_rdd.map(convert_pd_df_to_spark_df)
我猜測,沒有因爲sqlCtx
工作,不與分佈式計算(這是一個猜測,因爲堆棧跟蹤並沒有多大對我而言)。
提前感謝花時間閱讀:)。
不幸的是我得到了數以千計的Excel文件。 – bstempi