Debezium連接器的Kafka連接事件是Avro編碼。無法讀取Kafka主題avro消息
在傳遞給Kafka連接獨立服務的connect-standalone.properties中提到了以下內容。
key.converter=io.confluent.connect.avro.AvroConverter
value.confluent=io.confluent.connect.avro.AvroConverter
internal.key.converter=io.confluent.connect.avro.AvroConverter
internal.value.converter=io.confluent.connect.avro.AvroConverter
schema.registry.url=http://ip_address:8081
internal.key.converter.schema.registry.url=http://ip_address:8081
internal.value.converter.schema.registry.url=http://ip_address:8081
配置具有這些特性的卡夫卡消費者代碼:
Properties props = new Properties();
props.put("bootstrap.servers", "ip_address:9092");
props.put("zookeeper.connect", "ip_address:2181");
props.put("group.id", "test-consumer-group");
props.put("auto.offset.reset","smallest");
//Setting auto comit to false to ensure that on processing failure we retry the read
props.put("auto.commit.offset", "false");
props.put("key.converter.schema.registry.url", "ip_address:8081");
props.put("value.converter.schema.registry.url", "ip_address:8081");
props.put("schema.registry.url", "ip_address:8081");
在消費者實施,以下是閱讀的關鍵和值組件的代碼。我使用REST從Schema Registry獲取鍵和值的架構。
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
return reader.read(null, DecoderFactory.get().binaryDecoder(byteData, null));
解析鍵工作正常。在解析消息的值部分時,我得到了ArrayIndexOutOfBoundsException。
下載了Avro的源代碼並進行了調試。發現GenericDatumReader.readInt方法返回一個負值。這個值預計是一個數組的索引(符號),因此應該是正數。
嘗試使用kafka-avro-standalone-consumer使用事件,但它也拋出了ArrayIndexOutOfBoundsException。所以,我的猜測是這個消息在Kafka連接(製作人)&上編碼不正確,問題出在配置上。
以下是問題:
- 這有什麼錯與生產者或消費者通過配置?
- 爲什麼重要的反序列化工作,但沒有價值?
- 還有什麼需要做的事情的工作? (如指定字符編碼的地方)。
- Can Debezium with Avro可用於生產,還是現在的實驗性功能?關於Debezium Avro的文章特別指出,涉及Avro的例子將會包含在內。
已經有很多帖子,在Avro反序列化拋出ArrayIndexOutOfBoundsException但無法將其與我面臨的問題相關聯。