2016-08-11 28 views
0

我正在使用Spark 1.5.2從scala對象使用以下語法創建數據框。我的目的是爲單元測試創​​建一個數據。Spark:SQL上下文:從Scala對象創建數據框

class Address (first:String = null, second: String = null, zip: String = null){} 
class Person (id: String = null, name: String = null, address: Seq[Address] = null){} 

def test() = { 

    val sqlContext = new SQLContext(sc) 

    import sqlContext.implicits._ 

    val persons = Seq(
    new Person(id = "1", name = "Salim", 
     address = Seq(new Address(first = "1st street"))), 
    new Person(name = "Sana", 
     address = Seq(new Address(zip = "60088"))) 
) 

    // The code can't infer schema automatically 
    val claimDF = sqlContext.createDataFrame(sc.parallelize(persons, 2),classOf[Person]) 

    claimDF.printSchema() // This prints "root" not the schema of Person. 
} 

相反,如果我轉換人事和地址,以案例類,然後星火可以自動使用上述語法或使用sc.parallelize(persons, 2).toDF或使用sqlContext.createDataFrame(sc.parallelize(persons, 2),StructType)

我可以繼承模式因爲它不能容納20個以上的字段,所以我們有很多字段。使用StructType會帶來很多不便。案例類最方便,但不能容納太多的屬性。

請幫助,在此先感謝。

+0

我認爲,如果你的類擴展[產品特點](http://www.scala-lang.org/api/2.10.6/#scala.Product),並實施其抽象方法它可能工作。 (由於這個簽名:'createDataFrame [A <:Product](data:Seq [A])') –

回答

0

對代碼進行兩處更改將使printSchema()發出數據框的完整結構而不使用大小寫類。

首先,丹尼爾建議,你需要對你的類擴展scala.Product特質(痛苦的,但需要以下.toDF法):

class Address (first:String = null, second: String = null, zip: String = null) extends Product with Serializable 
{ 
    override def canEqual(that: Any) = that.isInstanceOf[Address] 
    override def productArity: Int = 3 
    def productElement(n: Int) = n match { 
    case 0 => first; case 1 => second; case 2 => zip 
    } 
} 

class Person (id: String = null, name: String = null, address: Seq[Address] = null) extends Product with Serializable 
{ 
    override def canEqual(that: Any) = that.isInstanceOf[Person] 
    override def productArity: Int = 3 
    def productElement(n: Int) = n match { 
    case 0 => id; case 1 => name; case 2 => address 
    } 
} 

其次,你應該創建一個使用.toDF您的數據幀被使得與範圍import sqlContext.implicits._而不是使用sqlContext.createDataFrame(..)像這樣隱式方法:

val claimDF = sc.parallelize(persons, 2).toDF 

然後claimDF.printSchema()將打印:

root 
|-- id: string (nullable = true) 
|-- name: string (nullable = true) 
|-- address: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- first: string (nullable = true) 
| | |-- second: string (nullable = true) 
| | |-- zip: string (nullable = true) 

或者,您可以使用Scala 2.11.0-M3刪除案例類別的22個字段限制。

1

非常感謝您的意見。

我們最終遷移到支持更大案例類的Scala 2.11的Spark 2.1,以便解決此問題。

對於Spark 1.6和Scala 2.10,我最終構建了Row對象和Struct類型來構建一個Dataframe。

val rows = Seq(Row("data")) 
val aRDD = sc.parallelize(rows) 
val aDF = sqlContext.createDataFrame(aRDD,getSchema()) 

def getSchema(): StructType= { 
    StructType(
     Array(
      StructField("jobNumber", StringType, nullable = true)) 
    ) 
}