2017-07-19 45 views
4
  • 星火2.1.1
  • 斯卡拉2.11.8
  • 的Java 8
  • Linux操作系統Ubuntu 16.04 LTS

我想我RDD轉變成一個數據集。對於這一點,我用的是implicits方法toDS(),給我下面的錯誤:如何在Datasets中使用java.time.LocalDate(失敗,發現java.lang.UnsupportedOperationException:沒有找到編碼器)?

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for java.time.LocalDate 
- field (class: "java.time.LocalDate", name: "date") 
- root class: "observatory.TemperatureRow" 
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:602) 
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:596) 
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:587) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
    at scala.collection.immutable.List.foreach(List.scala:381) 
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) 
    at scala.collection.immutable.List.flatMap(List.scala:344) 

在我的情況,我必須使用類型java.time.LocalDate,我不能使用java.sql.data。我已閱讀,我需要informe星火如何互感器Java類型到SQL類型,我這個方向,我建立以下2個implicits功能:

implicit def toSerialized(t: TemperatureRow): EncodedTemperatureRow = EncodedTemperatureRow(t.date.toString, t.location, t.temperature) 
implicit def fromSerialized(t: EncodedTemperatureRow): TemperatureRow = TemperatureRow(LocalDate.parse(t.date), t.location, t.temperature) 

下面,我的應用程序的一些代碼:

case class Location(lat: Double, lon: Double) 

case class TemperatureRow(
          date: LocalDate, 
          location: Location, 
          temperature: Double 
         ) 

case class EncodedTemperatureRow(
          date: String, 
          location: Location, 
          temperature: Double 

val s = Seq[TemperatureRow](
        TemperatureRow(LocalDate.parse("2017-01-01"), Location(1.4,5.1), 4.9), 
        TemperatureRow(LocalDate.parse("2014-04-05"), Location(1.5,2.5), 5.5) 
       ) 

import spark.implicits._ 
val temps: RDD[TemperatureRow] = sc.parallelize(s) 
val tempsDS = temps.toDS 

我不知道爲什麼星火搜索編碼器,用於java.time.LocalDate,我提供TemperatureRowEncodedTemperatureRow隱式轉換...

+1

您提供的隱式轉換對Spark來說簡直沒用 - 框架如何「知道」將對象轉換爲「EncodedTemperatureRow」? Spark需要使用類型爲'org.apache.spark.sql.Encoder [T]'的隱式值來編碼'T'類型的值,因此您需要提供一個不合適的'Encoder [TemperatureRow]'。創建這樣的編碼器不是微不足道的,請參閱https://stackoverflow.com/a/39442829/5344058 –

回答

3

java.time.LocalDate不支持Spark 2.2(我一直試圖編寫一個Encoder類型的一段時間和failed)。

您必須將java.time.LocalDate轉換爲其他支持的類型,java.sql.Timestampjava.sql.Date是受支持的候選。

+1

並澄清此處着陸的任何人:甚至Spark的2.2版本都不處理JDK8日期/時間類。 https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala中的轉換僅查看預編譯器JDK8日期/時間類。 – wishihadabettername

相關問題