2017-10-05 173 views
1

我開發了一個java應用程序,它使用Schema Registry從avro主題讀取數據,然後進行簡單的轉換並在控制檯中輸出結果。默認情況下,我使用GenericAvroSerde類來獲取鍵和值。一切正常,只是我必須定義額外配置每個SERDE像Kafka Streams在沒有模式的情況下創建了非主題

final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", kafkaStreamsConfig.getProperty("schema.registry.url")); 
    final Serde<GenericRecord> keyGenericAvroSerde = new GenericAvroSerde(); 
    final Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde(); 
    keyGenericAvroSerde.configure(serdeConfig, true); 
    valueGenericAvroSerde.configure(serdeConfig, false); 

沒有,我總是得到這樣的錯誤:

Exception in thread "NTB27821-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to deserialize value for record. topic=CH-PGP-LP2_S20-002_agg, partition=0, offset=4482940 
at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:46) 
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84) 
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117) 
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:474) 
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:642) 
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548) 
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519) 
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 69 
Caused by: java.lang.NullPointerException 
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122) 
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93) 
    at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55) 
    at io.confluent.kafka.streams.serdes.avro.GenericAvroDeserializer.deserialize(GenericAvroDeserializer.java:63) 
    at io.confluent.kafka.streams.serdes.avro.GenericAvroDeserializer.deserialize(GenericAvroDeserializer.java:39) 
    at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65) 
    at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55) 
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:56) 
    at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:44) 
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84) 
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117) 
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:474) 
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:642) 
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548) 
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519) 

嗯,這是用unsual,但細,之後(當我添加配置調用,因爲我上面張貼) - 它的工作和我的應用程序能夠進行所有操作並打印出結果。

但是! 當我嘗試使用call through() - 只是將數據發佈到新主題時 - 我遇到了我所問的問題:主題是在沒有模式的情況下創建的。 它怎麼可能?

有趣的事實是,數據被寫入,但它是: 一)以二進制格式,所以簡單的消費不能讀它 二)有沒有一個模式 - 這樣的Avro消費者看不懂,要麼:

Processed a total of 1 messages 
[2017-10-05 11:25:53,241] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:105) 
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 0 
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403 
     at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:182) 
     at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:203) 
     at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:379) 
     at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:372) 
     at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:65) 
     at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:131) 
     at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122) 
     at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93) 
     at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:122) 
     at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:114) 
     at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:140) 
     at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78) 
     at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53) 
     at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) 
[2017-10-05 11:25:53,241] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:105) 
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 0 
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403 
     at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:182) 
     at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:203) 
     at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:379) 
     at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:372) 
     at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:65) 
     at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:131) 
     at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122) 
     at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93) 
     at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:122) 
     at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:114) 
     at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:140) 
     at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78) 
     at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53) 
     at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) 

當然我檢查了架構註冊表中的主題:

curl -X GET http://localhost:8081/subjects/agg_value_9-value/versions 
{"error_code":40401,"message":"Subject not found."} 

但通過Java應用程序編寫了另一個話題同一個呼叫 - 初始數據的生產者顯示模式存在:

curl -X GET http://localhost:8081/subjects/CH-PGP-LP2_S20-002_agg-value/versions 
[1] 

兩個應用程序都使用相同的「schema.registry.url」配置 只是爲了總結 - 主題創建,數據被寫入,可以用簡單的消費讀取,但它是二進制和模式不存在。

此外,我試圖創建一個與Landoop模式,以某種方式匹配數據,但沒有成功 - 順便說一句,它不是一個正確的方式來使用kafka流 - 一切都應該在飛行中完成。

請幫忙!

+0

你使用哪個版本?另外,所以您將AvroSerde設置爲'StreamsConfig'作爲默認設置,或者您是否單獨設置每個運營商?您是否在創建應用程序之前手動創建了主題用途?另請查看此示例:https://github.com/confluentinc/kafka-streams-examples/blob/3.3.0-post/src/test/java/io/confluent/examples/streams/GenericAvroIntegrationTest.java#L83- L85 –

+0

我使用Confluent 3.3.0,java 1.8,kafka 0.11.0.0-cp1,avro 1.7.7版。我將GenericAvroSerde設置爲默認值,但對於簡單類型,我會覆蓋這些設置(Serdes.Long,Serdes.String,Serdes.Float)。我打算使用的主題不存在,但它是在寫入數據時按照我在開始帖子中寫的方式創建的。 –

+0

關於這個例子 - 除了在示例中將key解碼爲bytearray的同時我使用stringSerde(因爲key的avro模式只是「string」),我完全相同。模式regstry url可用,如果不是,我將無法讀取kafka流應用程序中的初始數據。但我能夠和最後的流被打印正確當我只做finalStream.print() –

回答

0

當調用through時,除非用戶明確覆蓋它,否則將使用通過StreamsConfig定義的默認serde。你使用了哪個默認的serde?爲了正確,您應該使用AbstractKafkaAvroSerializer,它將自動註冊該主題的模式。

相關問題