2016-02-15 59 views
1

我想解析一個新的Spark 1.6.0 API數據集的CSV。無論如何,我有一些問題要做到這一點。我想爲每個CSV行創建一個case class如何使用基於案例類的數據集解析CSV?

這是代碼:

case class MyData (forename:String, surname:String, age:Integer) 

    def toMyData(text: String): Dataset[MyData] = { 
     val splits: Array[String] = text.split("\t") 
     Seq(MyData(
     forename = splits(0), 
     surname = splits(1), 
     age = splits(2).asInstanceOf[Integer] 
    )).toDS() 
    } 

    val lines:Dataset[MyData] = sqlContext.read.text("/data/mydata.csv").as[MyData] 
    lines.map(r => toMyData(r)).foreach(println) 

toMyData只是有點Encoder,但我不知道該怎麼辦,該API以下正常。

有什麼想法?

編輯:

我已經改變了代碼,這樣,但我不能讓它甚至編譯:

val lines:Dataset[MyData] = sqlContext.read.text("/data/mydata.csv").as[MyData] 
    lines.map(r => toMyData(r)).foreach(println) 

def toMyData(text: String): Dataset[MyData] = { 
     val df = sc.parallelize(Seq(text)).toDF("value") 

     df.map(_.getString(0).split("\t") match { 
     case Array(fn, sn, age) => 
      MyData(fn, sn, age.asInstanceOf[Integer]) 
     }).toDS 

    } 

    sqlContext.read.text("/data/mydata.csv").as[String].map(r => toMyData(r)).collect().foreach(println) 

因爲我:

Error:(50, 10) value toDS is not a member of org.apache.spark.rdd.RDD[MyData] 
possible cause: maybe a semicolon is missing before `value toDS'? 
     }).toDS 
     ^
Error:(54, 133) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing sqlContext.implicits._ Support for serializing other types will be added in future releases. 
    sqlContext.read.text("/data/mydata.csv").as[String].map(r => toMyData(r)).collect().foreach(println) 

回答

3

忽略格式驗證和異常處理:

// Simulate sqlContext.read.text("/data/mydata.csv") 
val df = sc.parallelize(Seq("John\tDoe\t22")).toDF("value") 

df.rdd.map(_.getString(0).split("\t") match { 
    case Array(fn, sn, age) => MyData(fn, sn, age.toInt) 
}).toDS 

或無須轉換爲RDDS:

import org.apache.spark.sql.functions.regexp_extract 

val pattern = "^(.*?)\t(.*?)\t(.*)$" 
val exprs = Seq(
    (1, "forename", "string"), (2, "surname", "string"), (3, "age", "integer") 
).map{case (i, n, t) => regexp_extract($"value", pattern, i).alias(n).cast(t)} 

df 
    .select(exprs: _*) // Convert to (StringType, StringType, IntegerType) 
    .as[MyData] // cast 

摘要:

  • 不使用嵌套的動作,變換或DDS。
  • 請先閱讀如何使用asInstanceOf。這裏不適用。
+0

我想'''.as [String] .transform(r => toMyData(r))''''有什麼意義。無論如何,我會嘗試你的解決方案。謝謝 – Randomize

+1

'toMyData'放置在轉換中根本無法工作。數據集是分佈式結構,不能嵌套。 – zero323

+0

不,它沒有。仔細檢查類型。 Map函數的類型是'Row => MyData'而不是'Row => DataSet [MyData]'。 – zero323