2017-06-19 48 views
0

我正在使用最新版本的Spark(2.1.1)。我通過spark.read.csv將多個csv文件讀取到dataframe。 使用此數據框處理後,如何將其保存爲輸出帶有特定名稱的csv文件。Pyspark:將df寫入具有特定名稱的文件,劇情df

例如,有100個輸入文件(in1.csv,in2.csv,in3.csv,... in100.csv)。 屬於in1.csv的行應保存爲in1-result.csv。屬於in2.csv的行應保存爲in2-result.csv等。(默認文件名將像part-xxxx-xxxxx這是不可讀的)

我已經看到partitionBy(col),但看起來像它可以只是按列分區。

另一個問題是我想繪製我的數據幀。 Spark沒有內置的圖庫。許多人使用df.toPandas()轉換爲熊貓並繪製它。有沒有更好的解決方案?由於我的數據非常大,並且toPandas()會導致內存錯誤。我正在處理服務器,並希望將圖像保存爲圖像而不是顯示。

+0

的可能的複製[如何保存火花數據幀在磁盤上的CSV?](https://stackoverflow.com/questions/33174443/how-to-save-a-spark-dataframe-as-csv -on-disk) – eliasah

+0

我不問如何將它正常保存到磁盤。我想用特定的名字保存它。防爆。 in1.csv中的行將被寫爲in1-result.csv。 (這個名字不是part-xxxx) – NoobProgrammer

+2

spark使用hadoop,除非你使用MultipleTextOutputFormat和字符串RDD將它保存爲hadoop文件,那麼在火花中沒有開箱即用的解決方案 – eliasah

回答

1

,我建議如下解決方案,用於與輸入文件中的特定目錄寫入數據框:

  • 在循環爲每個文件:
    • 讀csv文件
    • 有關輸入文件的信息添加新列使用withColumn轉換
    • 將所有數據幀聯合使用union轉換
  • 也需要預處理
  • 使用partitionBy通過提供輸入文件信息欄,讓與同一輸入文件行將被保存在相同的輸出目錄保存結果

代碼可能是這樣的:

all_df = None 
for file in files: # where files is list of input CSV files that you want to read 
    df = spark.read.csv(file) 
    df.withColumn("input_file", file) 
    if all_df is None: 
     all_df = df 
    else: 
     all_df = all_df.union(df) 

# do preprocessing 

result.write.partitionBy(result.input_file).csv(outdir) 
+0

非常感謝你這個好主意。我對你的解決方案做類似的事情。我的代碼如下所示:df = df.withColumn(「filename」,input_file_name()),然後是df.write.partitionBy(「filename」)。format('csv')。save(「mypath」)文件名是仍然是part-xxxx,但輸出文件夾與輸入匹配仍然很棒。 – NoobProgrammer