2016-08-05 32 views
1

我正在尋找一個例子來做類似於GenericRecordBase的Avro SpecificRecordBase對象,或者如果有更簡單的方法來使用AvroSerializer類作爲Kafka鍵和值序列化程序。Bijection - Java Avro Serialization

Injection<GenericRecord, byte[]> genericRecordInjection = 
             GenericAvroCodecs.toBinary(schema); 
byte[] bytes = genericRecordInjection.apply(type); 

回答

1

https://github.com/miguno/kafka-storm-starter提供了這樣的示例代碼。

例如參見AvroDecoderBolt。從其的javadoc:

此螺栓預計阿夫羅編碼的二進制格式的輸入數據,根據所述T阿夫羅架構序列化。它會將傳入的數據反序列化爲一個pojo,並將此pojo發送給下游用戶。因此,這個螺栓可以被認爲是Avro數據的Twitter Bijection的Injection.invert[T, Array[Byte]](bytes)的風暴等效物。

其中

T:正在使用的類型基於底層阿夫羅架構阿夫羅記錄(例如Tweet)的。必須是Avro的SpecificRecordBase的子類。

代碼的關鍵部分是(I摺疊的代碼到這個片斷):

// With T <: SpecificRecordBase 

implicit val specificAvroBinaryInjection: Injection[T, Array[Byte]] = 
SpecificAvroCodecs.toBinary[T] 

val bytes: Array[Byte] = ...; // the Avro-encoded data 
val decodeTry: Try[T] = Injection.invert(bytes) 
decodeTry match { 
    case Success(pojo) => 
    System.out.println("Binary data decoded into pojo: " + pojo) 
    case Failure(e) => log.error("Could not decode binary data: " + Throwables.getStackTraceAsString(e)) 
} 
0
Schema.Parser parser = new Schema.Parser(); 
      Schema schema = parser.parse(new File("/Users/.../schema.avsc")); 
      Injection<Command, byte[]> objectInjection = SpecificAvroCodecs.toBinary(schema); 
      byte[] bytes = objectInjection.apply(c);