2016-08-23 108 views
4

我需要通過添加兩個新參數來修改類。這個類與Kryo序列化。 我每次停止流時都會持續保留與此課程有關的信息,其中包括RDD。 當我重新啓動流時,我加載了先前保存的信息,並使用它們在停止和重新啓動時保持一致。Kryo:反序列化舊版本的類

由於I類堅持需要這些新參數,因此我通過爲新參數添加新的kryo.writeObject(output, object, ObjectSerializer)kryo.readObject(input, classOf[Object], ObjectSerializer)來更改類和序列化程序。

現在,無論何時重新啓動我的流,我都會得到一個異常:「遇到未註冊的類...」。

這似乎很明顯,因爲我試圖反序列化一個對象,而這個對象不包含在當我停止流時持久化的信息中。 如果我刪除這些數據並啓動流,就好像它沒有任何先前的運行一樣,則不會發生異常。

有沒有辦法避免這種異常? 也許通過指定一些默認值來防止這些參數丟失?

謝謝

編輯:

我發現一些有用的東西,我沒有看到前: Kryo issue 194

這個傢伙通過簡單地插入一個長的定義他應該使用的解串器版本來實現版本控制。 這是一個簡單的解決方案,但是,由於編寫我正在處理的代碼的公司沒有考慮向前兼容性,我想我必須在新序列化器之前將所有數據保留下來。

請讓我知道如果你有人可以提出更好的解決方案。

編輯2:

這種局面仍然沒有解決。 我試圖按照此處所述使用CompatibleFieldSerializer:CompatibleFieldSerializer Example 因此,通過註冊此序列化程序而不是以前使用的自定義程序。 結果是,現在,當重新加載持久數據時,它給出了一個java.lang.NullPointerException。 如果以前的數據不存在,仍然沒有問題。我可以啓動我的流,序列化新數據,停止流,反序列化並重新啓動我的流。 仍然沒有線索的決議。

回答

2

問題的解決方案是在幾個月前發現的。所以我想盡快回答這個問題。 問題在於,由於代碼中的錯誤,該類使用標準的Kryo FieldSerializer序列化,該序列不兼容。 我們必須執行以下操作來反序列化舊類並將其轉換爲新的序列化類。

的情況是:

case class ClassA(field1 : Long, field2 : String) 

它連載這樣的:

object ClassASerializer extends Serializer[ClassA] with Serializable{ 
    override def write(kryo: Kryo, output: Output, t: ClassA) = { 
     output.writeLong { t.field1 } 
     output.writeString { t.field2 } 
} 
    override def read(kryo: Kryo, input: Input, aClass: Class[ClassA]) = 
     classA( 
      field1 = input.readLong(), 
      field2 = input.readLong() 
     ) 

,並與串行序列化包含這些類序列,以註冊所有人的所有序列化是循環類。

protected def registry: Seq[aClass: Class[A], serializer: Serializer[A]] = ... 
    final def register(kryo: Kryo) = { 
     registry.foreach { registrable => kryo.register(registrable.aClass, registrable.serializer) } 
    } 

該類需要通過添加一個新字段進行修改,該字段是另一個案例類的實例。

爲了進行這樣的改變,我們不得不使用有關的KRYO庫「可選」的註釋,

... 
import com.esotericsoftware.kryo.serializers.FieldSerializer.Optional 
import scala.annotation.meta.field 
... 

case class ClassA(field1 : Long, field2 : String, @(Optional @field)("field3") field3 : ClassB) 

串行器被修改,例如讀取舊的序列化的類時,它可以實例字段3用默認值和,寫入時,寫這樣默認值:

object ClassASerializer extends Serializer[ClassA] with Serializable{ 
    override def write(kryo: Kryo, output: Output, t: ClassA) = { 
     output.writeLong { t.field1 } 
     output.writeString { t.field2 } 
     kryo.writeObject(output, Option { t.field3 } getOrElse ClassB.default, ClassBSerializer) 

} 
    override def read(kryo: Kryo, input: Input, aClass: Class[ClassA]) = 
     ClassA( 
      field1 = input.readLong(), 
      field2 = input.readLong(), 
      field3 = ClassB.default 
     ) 

的KRYO串行登記還修改也註冊可選字段:

protected def registry: Seq[aClass: Class[A], serializer: Serializer[A]] = ... 
    def optionals = Seq("field3") 

    final def register(kryo: Kryo) = { 
     optionals.foreach { optional => 
     kryo.getContext.asInstanceOf[ObjectMap[Any, Any]].put(optional, true) } 
     registry.foreach { registrable => kryo.register(registrable.aClass, registrable.serializer) } 
    } 

因此,我們能夠編寫新版本的序列化類。 之後,我們必須刪除可選註釋,修改序列化程序以便從新的序列化類讀取實際字段,並刪除可選的序列化程序註冊並將其添加到註冊表Seq。

與此同時,我們糾正了強制通過FieldSerializer進行序列化的代碼中的錯誤,但這不在問題的範圍之內。