https://github.com/miguno/kafka-storm-starter提供了這樣的示例代碼。
例如參見AvroDecoderBolt。從其的javadoc:
此螺栓預計阿夫羅編碼的二進制格式的輸入數據,根據所述T
阿夫羅架構序列化。它會將傳入的數據反序列化爲一個pojo,並將此pojo發送給下游用戶。因此,這個螺栓可以被認爲是Avro數據的Twitter Bijection的Injection.invert[T, Array[Byte]](bytes)
的風暴等效物。
其中
T
:正在使用的類型基於底層阿夫羅架構阿夫羅記錄(例如Tweet
)的。必須是Avro的SpecificRecordBase
的子類。
代碼的關鍵部分是(I摺疊的代碼到這個片斷):
// With T <: SpecificRecordBase
implicit val specificAvroBinaryInjection: Injection[T, Array[Byte]] =
SpecificAvroCodecs.toBinary[T]
val bytes: Array[Byte] = ...; // the Avro-encoded data
val decodeTry: Try[T] = Injection.invert(bytes)
decodeTry match {
case Success(pojo) =>
System.out.println("Binary data decoded into pojo: " + pojo)
case Failure(e) => log.error("Could not decode binary data: " + Throwables.getStackTraceAsString(e))
}