2017-07-03 35 views
1

我正在使用下面的模式使用Spark的數據源API創建DataFrame。Spark 2.1.1使用select()方法時,DataFrame給出錯誤的列

StructType(Seq(StructField("name", StringType, true), 
         StructField("age", IntegerType, true), 
         StructField("livesIn", StringType, true), 
         StructField("bornIn", StringType, true))) 

我硬編碼的數據作爲PrunedFilteredScan的buildScan如下所示()方法:

val schemaFields = schema.fields 
// hardcoded for now. Need to read from Accumulo and plug it here 
val rec = List("KBN 1000000 Universe Parangipettai", "Sreedhar 38 Mysore Adoni", "Siva 8 Hyderabad Hyderabad", 
       "Rishi 23 Blr Hyd", "Ram 45 Chn Hyd", "Abey 12 Del Hyd") 

// Reading from Accumulo done. Constructing the RDD now for DF. 
val rdd = sqlContext.sparkContext.parallelize(rec)   
rdd.count 
val rows = rdd.map(rec => { 
    //println("file ===============>"+file) 
    val fields = rec.split(" ") 

    val typeCastedValues = fields.zipWithIndex.map{ 
    case (value, index) => { 
     //println(s"PRUNED val: ${value} - index: ${index}") 

     val dataType = schemaFields(index).dataType 
     typeCast(value, dataType) 
    } 
    } 
    Row.fromSeq(typeCastedValues) 
}) 
rows } 
private def typeCast(value: String, toType: DataType) = toType match { 
case _: StringType  => value 
case _: IntegerType  => value.toInt } 

當我創建數據框,如下圖所示:

val dfPruned = sqlContext.read.format(dsPackage).load().select("livesIn") 
dfPruned.show 
dfPruned.printSchema 

它給了我name列的數據頭文件爲livesIn。如果我丟失任何東西,或請幫助這是Spark的錯誤2.1.1 Ouput

+--------+ 
| livesIn| 
+--------+ 
|  KBN| 
|Sreedhar| 
| Siva| 
| Rishi| 
|  Ram| 
| Abey| 
+--------+ 

root 
|-- livesIn: string (nullable = true) 

回答

0

您應該創建dataframe當你有schema,當你已經將您rdd作爲Rows作爲

sqlContext.createDataFrame(rows, schema) 

然後,當你做

val dfPruned = sqlContext.createDataFrame(rows, schema).select("livesIn") 
dfPruned.show 
dfPruned.printSchema 

英語新HOULD越來越輸出

+---------+ 
| livesIn| 
+---------+ 
| Universe| 
| Mysore| 
|Hyderabad| 
|  Blr| 
|  Chn| 
|  Del| 
+---------+ 

root 
|-- livesIn: string (nullable = true) 

編輯

如果你想使用數據源API,那麼它更簡單

sqlContext.read.format("csv").option("delimiter", " ").schema(schema).load("path to your file ").select("livesIn") 

應該做的伎倆。

注意:我使用的輸入文件如下

KBN 1000000 Universe Parangipettai 
Sreedhar 38 Mysore Adoni 
Siva 8 Hyderabad Hyderabad 
Rishi 23 Blr Hyd 
Ram 45 Chn Hyd 
Abey 12 Del Hyd 
+0

謝謝Ramesh。但是,我需要實現這個擴展Spark的數據源API,但不使用createDataFrame()方法。 –

+0

我已經更新了我的答案。 :)我希望我知道它是正確的。 –

0

如果你想申請您的RDD的模式可以參考以下使用createDataFrame功能。

// create a row from your data by splitting wit " " 
    val rows = rdd.map(value => { 
     val data = value.split(" ") 
    // you could use Rows.fromSeq(data) but since you need second field as int needs conversion 

     Row(data(0), data(1).toInt, data(2), data(3)) 
    }) 

    //creating a dataframe with rows and schema 
    val df = sparkContext.createDataFrame(rows, schema) 


    // selecting only column livesIn 
    df.select("livesIn") 

輸出:

+---------+ 
| livesIn| 
+---------+ 
| Universe| 
| Mysore| 
|Hyderabad| 
|  Blr| 
|  Chn| 
|  Del| 
+---------+ 

希望這是有幫助的!

+0

謝謝Shankar。但是,我需要實現這個擴展Spark的數據源API,但不使用createDataFrame()方法。 –

相關問題