我正在使用下面的模式使用Spark的數據源API創建DataFrame。Spark 2.1.1使用select()方法時,DataFrame給出錯誤的列
StructType(Seq(StructField("name", StringType, true),
StructField("age", IntegerType, true),
StructField("livesIn", StringType, true),
StructField("bornIn", StringType, true)))
我硬編碼的數據作爲PrunedFilteredScan的buildScan如下所示()方法:
val schemaFields = schema.fields
// hardcoded for now. Need to read from Accumulo and plug it here
val rec = List("KBN 1000000 Universe Parangipettai", "Sreedhar 38 Mysore Adoni", "Siva 8 Hyderabad Hyderabad",
"Rishi 23 Blr Hyd", "Ram 45 Chn Hyd", "Abey 12 Del Hyd")
// Reading from Accumulo done. Constructing the RDD now for DF.
val rdd = sqlContext.sparkContext.parallelize(rec)
rdd.count
val rows = rdd.map(rec => {
//println("file ===============>"+file)
val fields = rec.split(" ")
val typeCastedValues = fields.zipWithIndex.map{
case (value, index) => {
//println(s"PRUNED val: ${value} - index: ${index}")
val dataType = schemaFields(index).dataType
typeCast(value, dataType)
}
}
Row.fromSeq(typeCastedValues)
})
rows }
private def typeCast(value: String, toType: DataType) = toType match {
case _: StringType => value
case _: IntegerType => value.toInt }
當我創建數據框,如下圖所示:
val dfPruned = sqlContext.read.format(dsPackage).load().select("livesIn")
dfPruned.show
dfPruned.printSchema
它給了我name
列的數據頭文件爲livesIn
。如果我丟失任何東西,或請幫助這是Spark的錯誤2.1.1 Ouput
+--------+
| livesIn|
+--------+
| KBN|
|Sreedhar|
| Siva|
| Rishi|
| Ram|
| Abey|
+--------+
root
|-- livesIn: string (nullable = true)
謝謝Ramesh。但是,我需要實現這個擴展Spark的數據源API,但不使用createDataFrame()方法。 –
我已經更新了我的答案。 :)我希望我知道它是正確的。 –