2017-08-04 168 views
0

Debezium連接器的Kafka連接事件是Avro編碼。無法讀取Kafka主題avro消息

在傳遞給Kafka連接獨立服務的connect-standalone.properties中提到了以下內容。

key.converter=io.confluent.connect.avro.AvroConverter 
value.confluent=io.confluent.connect.avro.AvroConverter 
internal.key.converter=io.confluent.connect.avro.AvroConverter 
internal.value.converter=io.confluent.connect.avro.AvroConverter 
schema.registry.url=http://ip_address:8081 
internal.key.converter.schema.registry.url=http://ip_address:8081 
internal.value.converter.schema.registry.url=http://ip_address:8081 

配置具有這些特性的卡夫卡消費者代碼:

Properties props = new Properties(); 
props.put("bootstrap.servers", "ip_address:9092"); 
props.put("zookeeper.connect", "ip_address:2181"); 
props.put("group.id", "test-consumer-group"); 
props.put("auto.offset.reset","smallest"); 
//Setting auto comit to false to ensure that on processing failure we retry the read 
props.put("auto.commit.offset", "false"); 
props.put("key.converter.schema.registry.url", "ip_address:8081"); 
props.put("value.converter.schema.registry.url", "ip_address:8081"); 
props.put("schema.registry.url", "ip_address:8081"); 

在消費者實施,以下是閱讀的關鍵和值組件的代碼。我使用REST從Schema Registry獲取鍵和值的架構。

GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema); 
return reader.read(null, DecoderFactory.get().binaryDecoder(byteData, null)); 

解析鍵工作正常。在解析消息的值部分時,我得到了ArrayIndexOutOfBoundsException。

下載了Avro的源代碼並進行了調試。發現GenericDatumReader.readInt方法返回一個負值。這個值預計是一個數組的索引(符號),因此應該是正數。

嘗試使用kafka-avro-standalone-consumer使用事件,但它也拋出了ArrayIndexOutOfBoundsException。所以,我的猜測是這個消息在Kafka連接(製作人)&上編碼不正確,問題出在配置上。

以下是問題:

  1. 這有什麼錯與生產者或消費者通過配置?
  2. 爲什麼重要的反序列化工作,但沒有價值?
  3. 還有什麼需要做的事情的工作? (如指定字符編碼的地方)。
  4. Can Debezium with Avro可用於生產,還是現在的實驗性功能?關於Debezium Avro的文章特別指出,涉及Avro的例子將會包含在內。

已經有很多帖子,在Avro反序列化拋出ArrayIndexOutOfBoundsException但無法將其與我面臨的問題相關聯。

回答

相關問題