在flink項目中,我使用了一個case class click。如何在flink中使用joda.time(或者如何使用typeutils.runtime.kryo)
case class click(date: LocalDateTime, stbId:String, channelId :Int)
此類填充數據集,它能正常工作的日期作爲一個Java 8 java.time.LocalDateTime
。在java 7環境中切換到org.joda(version2.9)後,調用DataSet中的單擊對象沒有像以前那樣執行。訪問單擊對象的日期字段的某些功能時,投擲NullPointerExceptions
。這些功能的示例是getHourOfDay
toString
等。我可以確保點擊類的日期字段不爲空。 我懷疑喬達時間庫與kryo序列化不能很好地交互。請參閱joda DateTime format cause null pointer error in spark RDD functions或NPE in spark with Joda DateTime 在Flink API中,有靜態方法registerJodaTime
的org.apache.flink.api.java.typeutils.runtime.kryo.Serializers。這似乎是相關的。我簡單地嘗試過
import org.apache.flink.api.common._
import org.apache.flink.api.java.typeutils.runtime.kryo._
Serializers.registerJodaTime(new ExecutionConfig)
那還不夠。 我對嗎?我如何使用java.typeutils.runtime.kryo?
使用的版本Flink:0.9.1。斯卡拉:2.10 joda.time 2.9
追問: 這裏是確切的添加的代碼作爲建議(感謝費邊和羅伯特)
val env = ExecutionEnvironment.getExecutionEnvironment
//import org.apache.flink.api.common._
import org.apache.flink.api.java.typeutils.runtime.kryo._
Serializers.registerJodaTime(env.getConfig)
在嵌入式執行我能找到的日誌文件下面的相關部分:
16:44:53,998 INFO org.apache.flink.api.java.ExecutionEnvironment - The job has 2 registered types and 0 default Kryo serializers
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo types:
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializers types:
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializer Classes types: Entry{k=class org.joda.time.DateTime, v=class de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer},Entry{k=class org.joda.time.Interval, v=class de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer}
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers:
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers Classes
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered POJO types:
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Static code analysis mode: DISABLE
16:44:54,545 INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
16:44:54,560 DEBUG akka.event.EventStream - logger log1-Slf4jLogger started
....
16:44:57,103 DEBUG org.apache.flink.api.java.typeutils.TypeExtractor - class org.joda.time.LocalDateTime does not contain a getter for field iLocalMillis
16:44:57,103 DEBUG org.apache.flink.api.java.typeutils.TypeExtractor - class org.joda.time.LocalDateTime does not contain a setter for field iLocalMillis
16:44:57,103 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.joda.time.LocalDateTime is not a valid POJO type
16:44:57,275 DEBUG org.apache.flink.api.scala.ClosureCleaner$ - accessedFields: Map()
16:44:57,369 INFO org.apache.flink.api.java.ExecutionEnvironment - The job has 2 registered types and 0 default Kryo serializers
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo types:
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializers types:
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo with Serializer Classes types: Entry{k=class org.joda.time.DateTime, v=class de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer},Entry{k=class org.joda.time.Interval, v=class de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer}
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers:
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered Kryo default Serializers Classes
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Registered POJO types:
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment - Static code analysis mode: DISABLE
不過,我親眼目睹了以下
Exception in thread "main" java.lang.NullPointerException
at org.joda.time.LocalDateTime.isSupported(LocalDateTime.java:625)
at org.joda.time.format.DateTimeFormatterBuilder$PaddedNumber.printTo(DateTimeFormatterBuilder.java:1435)
at org.joda.time.format.DateTimeFormatterBuilder$Composite.printTo(DateTimeFormatterBuilder.java:2474)
at org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java:655)
at org.joda.time.format.DateTimeFormatter.print(DateTimeFormatter.java:709)
at org.joda.time.LocalDateTime.toString(LocalDateTime.java:2087)
at java.lang.String.valueOf(Unknown Source)
at scala.runtime.StringAdd$.$plus$extension(StringAdd.scala:13)
at myflink.click.toString(Ingestor.scala:20)
...
嗨,這行'Serializers.registerJodaTime(new ExecutionConfig)'正確的方法調用,但它沒有任何效果,因爲它不是來自ExecutionEnvironment的執行配置。 –