2016-08-14 76 views
3

我正在嘗試使用spark 1.6.1寫入csv文件。 說我有一個CSV文件是這樣的:無法在spark中寫入csv文件

date,category 
19900108,apples 
19900108,apples 
19900308,peaches 
19900408,peaches 
19900508,pears 
19910108,pears 
19910108,peaches 
19910308,apples 
19910408,apples 
19910508,apples 
19920108,pears 
19920108,peaches 
19920308,apples 
19920408,peaches 
19920508,pears 

我想創建一個輸出CSV文件是這樣的:

date,apples,peaches,pears 
1990,2,2,1 
1991,3,1,1 
1992,1,2,2 

我用這Scala代碼加載文件:

spark-shell --packages com.databricks:spark-csv_2.11:1.2.0 

import org.apache.spark.SparkContext 
import org.apache.spark.SparkConf 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.sql.functions._ 

val sqlContext = new SQLContext(sc) 

var df = sqlContext.read.format("com.databricks.spark.csv").option("header","true").option("inferSchema", "true").load("data/sample.csv") 

df = df.withColumn("year", df.col("date").substr(0,4)) 
df.groupBy("year").pivot("category").agg("category"->"count").show() 

當我運行這個,我得到這個數據幀,這正是我想要的

+----+------+-------+-----+ 
|year|apples|peaches|pears| 
+----+------+-------+-----+ 
|1990|  2|  2| 1| 
|1991|  3|  1| 1| 
|1992|  1|  2| 2| 
+----+------+-------+-----+ 

但是當我嘗試使用此代碼來寫這一個CSV:

df.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("mydata4.csv") 

這是當我打開它,這是不是我要找我收到csv文件。

date,category,year 
19900108,apples,1990 
19900108,apples,1990 
19900308,peaches,1990 
19900408,peaches,1990 
19900508,pears,1990 
19910108,pears,1991 
19910108,peaches,1991 
19910308,apples,1991 
19910408,apples,1991 
19910508,apples,1991 
19920108,pears,1992 
19920108,peaches,1992 
19920308,apples,1992 
19920408,peaches,1992 
19920508,pears,1992 

我錯過了什麼嗎?難道我做錯了什麼?

回答

3

您忘記將查詢結果存儲到新變量中。

val xf = df.groupBy("year").pivot("category").agg("category"->"count") 

然後用你最後的代碼行寫它。

xf.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("mydata4.csv") 
+0

謝謝。它做到了 – ronmac