2015-11-11 47 views
3

在flink項目中,我使用了一個case class click。如何在flink中使用joda.time(或者如何使用typeutils.runtime.kryo)

case class click(date: LocalDateTime, stbId:String, channelId :Int) 

此類填充數據集,它能正常工作的日期作爲一個Java 8 java.time.LocalDateTime。在java 7環境中切換到org.joda(version2.9)後,調用DataSet中的單擊對象沒有像以前那樣執行。訪問單擊對象的日期字段的某些功能時,投擲NullPointerExceptions。這些功能的示例是getHourOfDaytoString等。我可以確保點擊類的日期字段不爲空。 我懷疑喬達時間庫與kryo序列化不能很好地交互。請參閱joda DateTime format cause null pointer error in spark RDD functionsNPE in spark with Joda DateTime 在Flink API中,有靜態方法registerJodaTime的org.apache.flink.api.java.typeutils.runtime.kryo.Serializers。這似乎是相關的。我簡單地嘗試過

import org.apache.flink.api.common._ 
import org.apache.flink.api.java.typeutils.runtime.kryo._ 
Serializers.registerJodaTime(new ExecutionConfig) 

那還不夠。 我對嗎?我如何使用java.typeutils.runtime.kryo?

使用的版本Flink:0.9.1。斯卡拉:2.10 joda.time 2.9

追問: 這裏是確切的添加的代碼作爲建議(感謝費邊和羅伯特)

val env = ExecutionEnvironment.getExecutionEnvironment 
//import org.apache.flink.api.common._ 
import org.apache.flink.api.java.typeutils.runtime.kryo._ 
Serializers.registerJodaTime(env.getConfig) 

在嵌入式執行我能找到的日誌文件下面的相關部分:

16:44:53,998 INFO org.apache.flink.api.java.ExecutionEnvironment    - The job has 2 registered types and 0 default Kryo serializers 
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment    - Registered Kryo types: 
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment    - Registered Kryo with Serializers types: 
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment    - Registered Kryo with Serializer Classes types: Entry{k=class org.joda.time.DateTime, v=class de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer},Entry{k=class org.joda.time.Interval, v=class de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer} 
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment    - Registered Kryo default Serializers: 
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment    - Registered Kryo default Serializers Classes 
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment    - Registered POJO types: 
16:44:53,998 DEBUG org.apache.flink.api.java.ExecutionEnvironment    - Static code analysis mode: DISABLE 
16:44:54,545 INFO akka.event.slf4j.Slf4jLogger         - Slf4jLogger started 
16:44:54,560 DEBUG akka.event.EventStream          - logger log1-Slf4jLogger started 
.... 
16:44:57,103 DEBUG org.apache.flink.api.java.typeutils.TypeExtractor    - class org.joda.time.LocalDateTime does not contain a getter for field iLocalMillis 
16:44:57,103 DEBUG org.apache.flink.api.java.typeutils.TypeExtractor    - class org.joda.time.LocalDateTime does not contain a setter for field iLocalMillis 
16:44:57,103 INFO org.apache.flink.api.java.typeutils.TypeExtractor     - class org.joda.time.LocalDateTime is not a valid POJO type 
16:44:57,275 DEBUG org.apache.flink.api.scala.ClosureCleaner$      - accessedFields: Map() 
16:44:57,369 INFO org.apache.flink.api.java.ExecutionEnvironment    - The job has 2 registered types and 0 default Kryo serializers 
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment    - Registered Kryo types: 
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment    - Registered Kryo with Serializers types: 
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment    - Registered Kryo with Serializer Classes types: Entry{k=class org.joda.time.DateTime, v=class de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer},Entry{k=class org.joda.time.Interval, v=class de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer} 
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment    - Registered Kryo default Serializers: 
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment    - Registered Kryo default Serializers Classes 
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment    - Registered POJO types: 
16:44:57,369 DEBUG org.apache.flink.api.java.ExecutionEnvironment    - Static code analysis mode: DISABLE 

不過,我親眼目睹了以下

Exception in thread "main" java.lang.NullPointerException 
    at org.joda.time.LocalDateTime.isSupported(LocalDateTime.java:625) 
    at org.joda.time.format.DateTimeFormatterBuilder$PaddedNumber.printTo(DateTimeFormatterBuilder.java:1435) 
    at org.joda.time.format.DateTimeFormatterBuilder$Composite.printTo(DateTimeFormatterBuilder.java:2474) 
    at org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java:655) 
    at org.joda.time.format.DateTimeFormatter.print(DateTimeFormatter.java:709) 
    at org.joda.time.LocalDateTime.toString(LocalDateTime.java:2087) 
    at java.lang.String.valueOf(Unknown Source) 
    at scala.runtime.StringAdd$.$plus$extension(StringAdd.scala:13) 
    at myflink.click.toString(Ingestor.scala:20) 
    ... 
+0

嗨,這行'Serializers.registerJodaTime(new ExecutionConfig)'正確的方法調用,但它沒有任何效果,因爲它不是來自ExecutionEnvironment的執行配置。 –

回答

4

Flink對於無法序列化的類型使用KryoLocalDateTime就是這樣一類。不幸的是,Kryo也無法正確地序列化它,所以我們必須告訴Kryo如何通過給這個類使用一個專門的序列化程序。

  1. 添加de.javakaffee:kryo-serializers作爲一個依賴:
<dependency> 
    <groupId>de.javakaffee</groupId> 
    <artifactId>kryo-serializers</artifactId> 
    <version>0.30</version> 
</dependency> 

(請注意添加這種依賴性可能會在集羣上使用弗林克造成問題。請讓我知道)

  • 註冊與ExecutionEnvironment新的串行:
  • val env = ExecutionEnvironment.getExecutionEnvironment 
    env.registerTypeWithKryoSerializer(classOf[LocalDateTime], classOf[JodaLocalDateTimeSerializer]) 
    

    我希望幫助(我保持舊的答案作爲參考)


    在弗林克調試KRYO /串行問題的一些一般性發言:

    在本地執行的任務(也應該在./bin/flink前端工作,但隨後的產量大概在日誌/目錄),你應該看到:

    14:05:52,863 INFO org.apache.flink.api.java.ExecutionEnvironment    - The job has 15 registered types and 2 default Kryo serializers 
    14:05:52,943 INFO org.apache.flink.runtime.minicluster.FlinkMiniCluster   - Starting FlinkMiniCluster. 
    14:05:53,150 INFO akka.event.slf4j.Slf4jLogger         - Slf4jLogger started 
    

    隨着註冊類型和KRYO數量大於0

    更高的串行的調試日誌級別(在log4j.properties取代INFODEBUG)其實你可以得到有關注冊序列化更詳細的信息:

    14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment    - Registered Kryo types: 
    14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment    - Registered Kryo with Serializers types: 
    14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment    - Registered Kryo with Serializer Classes types: 
    14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment    - Registered Kryo default Serializers: 
    14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment    - Registered Kryo default Serializers Classes 
    14:10:39,935 DEBUG org.apache.flink.api.java.ExecutionEnvironment    - Registered POJO types: 
    
    +0

    我添加了一些相關的日誌輸出。在「後續」之後需要閱讀。 –

    +0

    你能分享你正在運行的工作的完整源代碼嗎?或者,也許是一個最小的例子來重現這個問題 –

    3

    你應該在ExecutionEnvironmentExecutionConfig註冊喬達串行:

    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 
    Serializers.registerJodaTime(env.getConfig()); 
    

    希望這有助於。

    +0

    這絕對看起來像一個更有希望的方法。然而,嘗試代碼片斷仍然會產生NPE。你懷疑有其他問題嗎? –

    相關問題