2016-11-16 128 views
3

我想轉換RDD在Spark2.0轉換RDD到數據幀在2.0

val conf=new SparkConf().setAppName("dataframes").setMaster("local") 
val sc=new SparkContext(conf) 
val sqlCon=new SQLContext(sc) 
import sqlCon.implicits._ 
val rdd=sc.textFile("/home/cloudera/alpha.dat").persist() 
val row=rdd.first() 
val data=rdd.filter { x => !x.contains(row) } 

data.foreach { x => println(x) } 


case class person(name:String,age:Int,city:String) 
val rdd2=data.map { x => x.split(",") } 
val rdd3=rdd2.map { x => person(x(0),x(1).toInt,x(2)) } 
val df=rdd3.toDF() 


df.printSchema(); 
df.registerTempTable("alpha") 
val df1=sqlCon.sql("select * from alpha") 
df1.foreach { x => println(x) } 

到數據幀,但在toDF我得到一個錯誤以下()。 ---> 「VAL DF = rdd3.toDF()」

Multiple markers at this line: 
- Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case 
classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. 
- Implicit conversion found: rdd3 ⇒ rddToDatasetHolder(rdd3): (implicit evidence$4: 
org.apache.spark.sql.Encoder[person])org.apache.spark.sql.DatasetHolder[person] 

如何以上使用toDF()

回答

0

有一個簡單的錯誤,我在主方法內定義了case類。刪除後,我能夠將RDD轉換爲DataFrame。

package sparksql 

import org.apache.spark.SparkConf 
import org.apache.spark.sql.SparkSession 
import org.apache.spark.sql.Encoders 
import org.apache.spark.SparkContext 

object asw { 

case class Person(name:String,age:Int,city:String) 
def main(args: Array[String]): Unit = { 

val conf=new SparkConf().setMaster("local").setAppName("Dataframe") 
    val sc=new SparkContext(conf) 
val spark=SparkSession.builder().getOrCreate() 
import spark.implicits._ 


val rdd1=sc.textFile("/home/cloudera/alpha.dat") 
val row=rdd1.first() 
val data=rdd1.filter { x => !x.contains(row) } 
val rdd2=data.map { x => x.split(",") } 
val df=rdd2.map { x => Person(x(0),x(1).toInt,x(2)) }.toDF() 
df.createOrReplaceTempView("rdd21") 
spark.sql("select * from rdd21").show() 

} 
} 
1

Cloudera的&火花2.0轉換爲數據幀?嗯,並不認爲我們支持,但:) :)

無論如何,首先你不需要在RDD上撥打.persist(),所以你可以刪除那一點。其次,由於Person是一個案例類,你應該把它的名字大寫。

最後,在Spark 2.0中,您不再調用import sqlContext.implicits._隱式構建DataFrame架構,現在您可以撥打import spark.implicits._。這是由您的錯誤消息暗示。

+0

感謝埃裏克。我下面https://spark.apache.org/docs/2.0.0-preview/sql-programming-guide.html#datasets爲RDD轉化爲Dataframes 進口sqlContext.implicits._,我們可以在2.0使用。看起來像問題是與我已經在pom.xml中 提供下面的依賴編碼器 –

+0

<依賴性> org.apache.spark 火花core_2.10 2.0.0 org.apache.spark 火花sql_2.10 2.0.0

+0

這不是您所擁有版本的正確文檔版本。這是2.0.0預覽,隱式導入在2.0.0-preview和2.0.0之間更改。看到這裏:https://spark.apache.org/docs/2.0.0/sql-programming-guide.html –