1
我想解析一個新的Spark 1.6.0 API數據集的CSV。無論如何,我有一些問題要做到這一點。我想爲每個CSV行創建一個case class
。如何使用基於案例類的數據集解析CSV?
這是代碼:
case class MyData (forename:String, surname:String, age:Integer)
def toMyData(text: String): Dataset[MyData] = {
val splits: Array[String] = text.split("\t")
Seq(MyData(
forename = splits(0),
surname = splits(1),
age = splits(2).asInstanceOf[Integer]
)).toDS()
}
val lines:Dataset[MyData] = sqlContext.read.text("/data/mydata.csv").as[MyData]
lines.map(r => toMyData(r)).foreach(println)
我toMyData
只是有點Encoder
,但我不知道該怎麼辦,該API以下正常。
有什麼想法?
編輯:
我已經改變了代碼,這樣,但我不能讓它甚至編譯:
val lines:Dataset[MyData] = sqlContext.read.text("/data/mydata.csv").as[MyData]
lines.map(r => toMyData(r)).foreach(println)
def toMyData(text: String): Dataset[MyData] = {
val df = sc.parallelize(Seq(text)).toDF("value")
df.map(_.getString(0).split("\t") match {
case Array(fn, sn, age) =>
MyData(fn, sn, age.asInstanceOf[Integer])
}).toDS
}
sqlContext.read.text("/data/mydata.csv").as[String].map(r => toMyData(r)).collect().foreach(println)
因爲我:
Error:(50, 10) value toDS is not a member of org.apache.spark.rdd.RDD[MyData]
possible cause: maybe a semicolon is missing before `value toDS'?
}).toDS
^
Error:(54, 133) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing sqlContext.implicits._ Support for serializing other types will be added in future releases.
sqlContext.read.text("/data/mydata.csv").as[String].map(r => toMyData(r)).collect().foreach(println)
我想'''.as [String] .transform(r => toMyData(r))''''有什麼意義。無論如何,我會嘗試你的解決方案。謝謝 – Randomize
'toMyData'放置在轉換中根本無法工作。數據集是分佈式結構,不能嵌套。 – zero323
不,它沒有。仔細檢查類型。 Map函數的類型是'Row => MyData'而不是'Row => DataSet [MyData]'。 – zero323