我正在使用spark-1.3.1(pyspark),並且使用SQL查詢生成了一個表。我現在有一個對象是一個DataFrame。我想將這個DataFrame對象(我稱它爲「table」)導出到一個csv文件,以便我可以操作它並繪製列。如何將DataFrame「表」導出到csv文件?如何將pyspark中的表數據框導出爲csv?
謝謝!
我正在使用spark-1.3.1(pyspark),並且使用SQL查詢生成了一個表。我現在有一個對象是一個DataFrame。我想將這個DataFrame對象(我稱它爲「table」)導出到一個csv文件,以便我可以操作它並繪製列。如何將DataFrame「表」導出到csv文件?如何將pyspark中的表數據框導出爲csv?
謝謝!
如果數據幀適合於駕駛員記憶,你想保存到本地文件系統,您可以用toPandas
方法轉換Spark DataFrame當地Pandas DataFrame,然後簡單地使用to_csv
:
df.toPandas().to_csv('mycsv.csv')
否則,你可以使用spark-csv:
星火1.3
df.save('mycsv.csv', 'com.databricks.spark.csv')
星火1.4+
df.write.format('com.databricks.spark.csv').save('mycsv.csv')
火花2.0+,你可以直接使用csv
數據來源:
df.write.csv('mycsv.csv')
如果您不能使用火花CSV,你可以做到以下幾點:
df.rdd.map(lambda x: ",".join(map(str, x))).coalesce(1).saveAsTextFile("file.csv")
如果您需要處理字符串換行符或逗號不起作用。使用這個:
import csv
import cStringIO
def row2csv(row):
buffer = cStringIO.StringIO()
writer = csv.writer(buffer)
writer.writerow([str(s).encode("utf-8") for s in row])
buffer.seek(0)
return buffer.read().strip()
df.rdd.map(row2csv).coalesce(1).saveAsTextFile("file.csv")
這怎麼樣(在你不想要一個班輪)?
for row in df.collect():
d = row.asDict()
s = "%d\t%s\t%s\n" % (d["int_column"], d["string_column"], d["string_column"])
f.write(s)
f是一個打開的文件描述符。此外,分隔符是一個TAB字符,但很容易改變爲任何你想要的。
對於Apache Spark 2+,爲了將數據幀保存到單個csv文件中。使用以下命令:
query.repartition(1).write.csv("cc_out.csv", sep='|')
這裏1
表示我只需要一個csv分區。您可以根據您的要求進行更改。
超級回答。對於第一個選項,如果我想寫入管道分隔文件而不是逗號分隔的CSV,這可能嗎? –
如果你有火花數據幀,你可以使用'df.write.csv('/ tmp/lookatme /')',並且會在'/ tmp/lookatme'中放置一組csv文件。使用spark要比序列化快得多在熊貓。唯一的缺點是你最終會得到一組csvs而不是一個,如果目標工具不知道如何連接它們,你需要自己做。 – Txangel
讓csv脫離火花是一件大事。有關第一種解決方案的一些有趣之處在於'to_csv'工作時無需導入熊貓。 '.toPandas'是Spark的一部分,可能會隱式導入它.. – cardamom