2016-10-10 28 views
2

Scala 2.10在這裏使用Spark 1.6.2。我有一個類似的(但不是相同的)問題,但是,接受的答案不是SSCCE,並假設了一定數量的關於Spark的「前期知識」;因此我無法重現或理解它。 更重要的是,該問題也僅限於向現有數據框添加新列,而我需要爲數據框中的所有現有行添加列以及值。將StringType列添加到現有的Spark DataFrame,然後應用默認值


所以我想一列添加到現有的星火據幀,然後爲新列的所有行賦予初始(「默認」)值。

val json : String = """{ "x": true, "y": "not true" }""" 
val rdd = sparkContext.parallelize(Seq(json)) 
val jsonDF = sqlContext.read.json(rdd) 

jsonDF.show() 

當我跑,我得到這個下面的輸出(通過.show()):

+----+--------+ 
| x|  y| 
+----+--------+ 
|true|not true| 
+----+--------+ 

現在我想一個新的字段添加到jsonDF,它的創建,並在不修改json串後,使得所得的DF應該是這樣的:

+----+--------+----+ 
| x|  y| z| 
+----+--------+----+ 
|true|not true| red| 
+----+--------+----+ 

含義,我想添加一個新的「z」 colu mn到DF,類型StringType,然後默認所有行包含z - 值爲"red"

從其他的問題,我已拼湊以下僞代碼放在一起:

val json : String = """{ "x": true, "y": "not true" }""" 
val rdd = sparkContext.parallelize(Seq(json)) 
val jsonDF = sqlContext.read.json(rdd) 

//jsonDF.show() 

val newDF = jsonDF.withColumn("z", jsonDF("col") + 1) 

newDF.show() 

但是當我運行它,我得到的是.withColumn(...)方法的編譯器錯誤:

org.apache.spark.sql.AnalysisException: Cannot resolve column name "col" among (x, y); 
    at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:152) 
    at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:152) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:151) 
    at org.apache.spark.sql.DataFrame.col(DataFrame.scala:664) 
    at org.apache.spark.sql.DataFrame.apply(DataFrame.scala:652) 

我也沒有看到任何API方法可以讓我設置"red"作爲默認值。任何想法,我要去哪裏錯誤?

回答

8

您可以使用​​3210函數。首先,你必須將其導入

import org.apache.spark.sql.functions.lit 

,並用它作爲顯示在下面的列

jsonDF.withColumn("z", lit("red")) 

類型將被自動推斷。