2017-05-12 61 views
2

自從有些日子以來,我一直在玩合唱版本的kafka,以便更好地理解平臺。我收到了一些發送到一個主題的格式錯誤的avro消息的序列化例外。讓我用事實說明問題:Kafka:民意調查中的反序列化問題

<kafka.new.version>0.10.2.0-cp1</kafka.new.version> 
<confluent.version>3.2.0</confluent.version> 
<avro.version>1.7.7</avro.version> 

意向:很簡單,製片人正在發送的Avro記錄,而消費者應該消耗的所有記錄,沒有任何問題,(它可以保留在架構註冊表架構不兼容的所有消息。 ) 用法:

Producer -> 
Key -> StringSerializer 
Value -> KafkaAvroSerializer 

Consumer -> 
Key -> StringDeserializer 
Value -> KafkaAvroDeserializer 

其他消費屬性(僅供參考):

properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "somehost:9092"); 
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "myconsumer-4"); 
    properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "someclient-4"); 
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class); 
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class); 
    properties.put(AUTO_OFFSET_RESET_CONFIG, "earliest"); 
    properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); 
    properties.put("schema.registry.url", "schemaregistryhost:8081"); 

我能夠使用消息的Wi直到其他生產者錯誤地將一個消息發送到該主題並且修改了模式註冊表中的最新模式爲止。 (我們已經啓用了架構註冊表一個選項,以便您可以發送任何郵件主題和架構註冊表會進行架構每次的新版本中,我們可以,如果關掉了。)現在

,由於這種一個壞消息,poll()失敗,序列化問題。它確實給了我失敗的偏移量,我可以通過使用seek()來傳遞偏移量,但這聽起來不太好。我也嘗試使用最大輪詢記錄到10和poll()超時到非常小,這樣我可以通過捕獲異常來忽略最多10條記錄,但由於某種原因,max-records不起作用,並且代碼失敗並立即出現序列化錯誤,即使我從開始和壞消息是在240偏移量。

properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10"); 

另一種簡單的解決方案是使用ByteArrayDeserializer和我的應用程序中使用KafkaAvroDecoder,我可以對付反序列化的問題。

我相信有東西我失蹤或做錯了。添加例外太:

Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition topic.ongo.test3.user14-0 at offset 220 
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 186 
Caused by: org.apache.avro.AvroTypeException: Found com.catapult.TestUser, expecting com.catapult.TestUser, missing required field testname 
    at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292) 
    at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) 
    at org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:130) 
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:176) 
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) 
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) 
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:131) 
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:92) 
    at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:54) 
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:869) 
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:775) 
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:473) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1062) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) 

回答