2017-06-29 99 views
0

在下面的代碼中,out.csv採用實木複合地板格式。我錯過了什麼選項將其作爲csv文件編寫?Pyspark:寫入csv寫入實木複合地板而不是csv

import py4j 
from pyspark import SparkConf, SparkContext 
from pyspark import HiveContext as hc 
import os 
from pyspark.sql import SQLContext, Row 

from datetime import datetime 
from pyspark.sql.types import DateType,StringType 
import pyspark.sql.functions as F 

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-csv_2.11:1.5.0' 
conf = SparkConf().setMaster("local[64]").setAppName("My App") 
sc = SparkContext(conf = conf) 
sqlContext = SQLContext(sc) 

#read parquet file into DF 
df = sqlContext.read.parquet('/path/in_parquet') 

# Write to csv 
df_grouped = df.groupBy('column1').agg(F.sum('column2')) 
df_grouped.repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save("/path/out.csv") 

輸出:

留在控制檯的最後幾行。另外,這裏是我用來運行腳本的命令:

spark-submit --master local [*] --driver-memory 12g --packages com.databricks:spark-csv_2.11:1.2.0 MyPyspark .py

$ hdfs dfs -ls /path/out.csv 
Found 2 items 
-rw-r--r-- 3 me devs   0 2017-06-29 12:16 /path/out.csv/_SUCCESS 
-rw-r--r-- 3 me devs  552 2017-06-29 12:16 /path/out.csv/part-00000 
+0

你可以顯示一段輸出嗎? – eliasah

+0

編輯該問題以添加輸出日誌和更多信息。 – PSInf

+0

文件輸出,而不是作業: - | – eliasah

回答

0

Spark正在分別保存數據的每個分區,因此,您會爲每個分區獲取一個文件part-xxxxx。您指定的路徑.save("/path/out.csv")是保存文件的目錄,其中的part-xxxxx文件已經以csv格式存在。

如果您有多個文件和一個小數據集,您可以使用coalesce(1),然後保存結果以收回單個csv文件。對於較大的數據集,我建議先保存,然後將文件與FileUtil.copyMerge()(Hadoop命令)合併。