我想找到一個例子,我可以製作和訂閱卡夫卡的avro郵件。生產和消費卡夫卡Avro郵件沒有匯合組件
在這個時候,我想使用沒有任何融合附加組件的「香草」卡夫卡部署。
這是可能的嗎?我發現的所有例子都很快就開始使用融合特定工具來處理avro消息。
我相信我應該有一種方法可以在kafka平臺上發佈和使用avro消息,並使用任何「分發特定」無插件。
我想找到一個例子,我可以製作和訂閱卡夫卡的avro郵件。生產和消費卡夫卡Avro郵件沒有匯合組件
在這個時候,我想使用沒有任何融合附加組件的「香草」卡夫卡部署。
這是可能的嗎?我發現的所有例子都很快就開始使用融合特定工具來處理avro消息。
我相信我應該有一種方法可以在kafka平臺上發佈和使用avro消息,並使用任何「分發特定」無插件。
當然,你可以做到這一點,沒有任何Confluent工具。但是你必須在你身邊做額外的工作(例如在你的應用程序代碼中) - 這是提供Avro相關工具的最初動機,例如你提到的來自Confluent的工具。
一種方法是直接使用Apache Avro Java API手動序列化/反序列化Kafka消息的有效負載(例如從YourJavaPojo
到byte[]
)。 (我想你認爲Java是選擇的編程語言。)這將如何?這是一個例子。
byte[]
),然後使用Kafka的Java生產者客戶端將寫入編碼負載寫入Kafka主題。byte[]
到Java pojo)。當然,在使用流處理工具如Kafka Streams(將包含在即將推出的Apache Kafka 0.10中)或Apache Storm中時,您也可以直接使用Avro API。
最後,您還可以選擇使用某些實用程序庫(無論是來自Confluent還是其他地方),以便您不必直接使用Apache Avro API。關於它的價值,我已經在kafka-storm-starter,例如,如AvroDecoderBolt.scala所示。在這裏,Avro序列化/反序列化通過使用Scala庫Twitter Bijection完成。下面是AvroDecoderBolt.scala
一個例子片斷給你的總體思路:
// This tells Bijection how to automagically deserialize a Java type `T`,
// given a byte array `byte[]`.
implicit private val specificAvroBinaryInjection: Injection[T, Array[Byte]] =
SpecificAvroCodecs.toBinary[T]
// Let's put Bijection to use.
private def decodeAndEmit(bytes: Array[Byte], collector: BasicOutputCollector) {
require(bytes != null, "bytes must not be null")
val decodeTry = Injection.invert(bytes) // <-- deserialization, using Twitter Bijection, happens here
decodeTry match {
case Success(pojo) =>
log.debug("Binary data decoded into pojo: " + pojo)
collector.emit(new Values(pojo)) // <-- Here we are telling Storm to send the decoded payload to downstream consumers
()
case Failure(e) => log.error("Could not decode binary data: " + Throwables.getStackTraceAsString(e))
}
}
所以,是的,你當然可以選擇不使用任何額外的庫如匯合的Avro的串行器/解串器(目前提供作爲confluentinc/schema-registry部分)或Twitter's Bijection。是否值得額外的努力取決於你自己決定。
如果我理解你的問題,沒有任何內置的方式來生成和加載卡夫卡的avro消息。基本上,您可以使用像fastavro這樣的avro客戶端,在生成Kafka之前立即將其序列化爲avro格式,並在從主題消費後立即加載。 –
那麼像在自定義序列化程序中包裝avro序列化/反序列化邏輯? –
嗯......或多或少,在生成主題之前將其序列化爲avro格式。根據您使用的客戶端,可能會有快捷方式來執行此操作。 –