我開發了一個java應用程序,它使用Schema Registry從avro主題讀取數據,然後進行簡單的轉換並在控制檯中輸出結果。默認情況下,我使用GenericAvroSerde類來獲取鍵和值。一切正常,只是我必須定義額外配置每個SERDE像Kafka Streams在沒有模式的情況下創建了非主題
final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", kafkaStreamsConfig.getProperty("schema.registry.url"));
final Serde<GenericRecord> keyGenericAvroSerde = new GenericAvroSerde();
final Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde();
keyGenericAvroSerde.configure(serdeConfig, true);
valueGenericAvroSerde.configure(serdeConfig, false);
沒有,我總是得到這樣的錯誤:
Exception in thread "NTB27821-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to deserialize value for record. topic=CH-PGP-LP2_S20-002_agg, partition=0, offset=4482940
at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:46)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:474)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:642)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 69
Caused by: java.lang.NullPointerException
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93)
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
at io.confluent.kafka.streams.serdes.avro.GenericAvroDeserializer.deserialize(GenericAvroDeserializer.java:63)
at io.confluent.kafka.streams.serdes.avro.GenericAvroDeserializer.deserialize(GenericAvroDeserializer.java:39)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:56)
at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:44)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:474)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:642)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519)
嗯,這是用unsual,但細,之後(當我添加配置調用,因爲我上面張貼) - 它的工作和我的應用程序能夠進行所有操作並打印出結果。
但是! 當我嘗試使用call through() - 只是將數據發佈到新主題時 - 我遇到了我所問的問題:主題是在沒有模式的情況下創建的。 它怎麼可能?
有趣的事實是,數據被寫入,但它是: 一)以二進制格式,所以簡單的消費不能讀它 二)有沒有一個模式 - 這樣的Avro消費者看不懂,要麼:
Processed a total of 1 messages
[2017-10-05 11:25:53,241] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:105)
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 0
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:182)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:203)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:379)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:372)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:65)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:131)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93)
at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:122)
at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:114)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:140)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
[2017-10-05 11:25:53,241] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:105)
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 0
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:182)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:203)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:379)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:372)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:65)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:131)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:122)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93)
at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:122)
at io.confluent.kafka.formatter.AvroMessageFormatter.writeTo(AvroMessageFormatter.java:114)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:140)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
當然我檢查了架構註冊表中的主題:
curl -X GET http://localhost:8081/subjects/agg_value_9-value/versions
{"error_code":40401,"message":"Subject not found."}
但通過Java應用程序編寫了另一個話題同一個呼叫 - 初始數據的生產者顯示模式存在:
curl -X GET http://localhost:8081/subjects/CH-PGP-LP2_S20-002_agg-value/versions
[1]
兩個應用程序都使用相同的「schema.registry.url」配置 只是爲了總結 - 主題創建,數據被寫入,可以用簡單的消費讀取,但它是二進制和模式不存在。
此外,我試圖創建一個與Landoop模式,以某種方式匹配數據,但沒有成功 - 順便說一句,它不是一個正確的方式來使用kafka流 - 一切都應該在飛行中完成。
請幫忙!
你使用哪個版本?另外,所以您將AvroSerde設置爲'StreamsConfig'作爲默認設置,或者您是否單獨設置每個運營商?您是否在創建應用程序之前手動創建了主題用途?另請查看此示例:https://github.com/confluentinc/kafka-streams-examples/blob/3.3.0-post/src/test/java/io/confluent/examples/streams/GenericAvroIntegrationTest.java#L83- L85 –
我使用Confluent 3.3.0,java 1.8,kafka 0.11.0.0-cp1,avro 1.7.7版。我將GenericAvroSerde設置爲默認值,但對於簡單類型,我會覆蓋這些設置(Serdes.Long,Serdes.String,Serdes.Float)。我打算使用的主題不存在,但它是在寫入數據時按照我在開始帖子中寫的方式創建的。 –
關於這個例子 - 除了在示例中將key解碼爲bytearray的同時我使用stringSerde(因爲key的avro模式只是「string」),我完全相同。模式regstry url可用,如果不是,我將無法讀取kafka流應用程序中的初始數據。但我能夠和最後的流被打印正確當我只做finalStream.print() –