2016-12-19 120 views
2

我試圖使用df.write.csv將數據追加到我的csv文件。這就是我下面的火花文件http://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter後所做的:如何在pyspark中使用df.write.csv追加到csv文件?

from pyspark.sql import DataFrameWriter 
..... 
df1 = sqlContext.createDataFrame(query1) 
df1.write.csv("/opt/Output/sqlcsvA.csv", append) #also tried 'mode=append' 

執行上面的代碼給我錯誤:

NameError: name 'append' not defined

沒有追加,錯誤:

The path already exists.

+0

是否有一個由sqlcsvA.csv調用的文件? –

+0

是的輸出被複制到'sqlcsvA.csv'文件。 – kaks

+0

你可以刪除,並再次從代碼創建此文件? –

回答

0

我不知道關於Python ,但在Scala和Java中,可以通過以下方式設置保存模式:

df.write.mode("append").csv("pathToFile") 

我認爲它應該在Python中類似。 This可能會有所幫助。

+0

我試過你在python中說過的話。但是,我的輸出的每一行都被複制到一個名爲'sqlcsvA.csv'的文件夾中的獨立csv文件中。它們不會被複制到一個單獨的csv文件中。 – kaks

+1

@kaks,看起來你將不得不手動合併這些文件。看看這個[問題](http://stackoverflow.com/questions/31674530/write-single-csv-file-using-spark-csv)。例如,人們正在使用[FileUtil.copyMerge](https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/fs/FileUtil.html#copyMerge(org.apache.hadoop.fs。文件系統,%20org.apache.hadoop.fs.Path,%20org.apache.hadoop.fs.FileSystem,%20org.apache.hadoop.fs.Path,%20boolean,%20org.apache.hadoop.conf.Configuration,% 20java.lang.String))在Java中。 –

+0

@kaks,請注意,如果您讀取結果(在Spark中),則會合並這些文件,並且您有一個DataFrame,其中包含該目錄中所有文件的數據。 –

3
df.write.save(path='csv', format='csv', mode='append', sep='\t') 
+0

這又將輸出分割成不同的文件。它被分區。 – kaks

+2

在寫入之前包含'.coalesce(1)',它會阻止分區,不確定是否會附加結果! 'df.coalesce(1).write.save(path ='csv',format ='csv',mode ='append',sep ='\ t')' – Jarek

+0

謝謝。這一切都是爲了一個文件。 – kaks

2

從文檔: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter 由於V1.4

csv(path, mode=None, compression=None, sep=None, quote=None, escape=None, header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, timestampFormat=None)

例如

from pyspark.sql import DataFrameWriter 
..... 
df1 = sqlContext.createDataFrame(query1) 
df1.write.csv(path="/opt/Output/sqlcsvA.csv", mode="append") 

如果你想要寫一個文件,你可以在任意這些線路的使用聚結或repartition。不管哪一行,因爲數據幀只是一個DAG執行,在寫入csv之前不會執行任何操作。 repartition &​​3210有效地使用相同的代碼,但合併只能減少分區的數量,其中repartition也可以增加它們。爲了簡單起見,我只是堅持使用repartition

例如

df1 = sqlContext.createDataFrame(query1).repartition(1) 

df1.repartition(1).write.csv(path="/opt/Output/sqlcsvA.csv", mode="append") 

我想在文檔的例子並不是很大,他們沒有表現出比使用路徑其他參數的例子。

參考你試過兩件事情:

(append)

對於工作,就必須命名追加包含值「追加」一個字符串變量。 DataFrameWriter庫中沒有字符串常量,名爲append。即你可以在你的代碼中添加這個,然後它就可以工作。 追加=「追加」

('mode=append')

對於工作的CSV方法必須解析出mode=append字符串以獲取模式的價值,這將是額外的工作時,你可以有一個參數與需要提取的值「append」或「overwrite」完全相同。沒有一個是特殊情況,Python內置,並不特定於pyspark。

另一方面,我建議儘可能使用命名參數。 例如

csv(path="/path/to/file.csv", mode="append") 

,而不是位置參數

csv("/path/to/file.csv", "append") 

它更清晰,並幫助理解。