1
每當我嘗試從kafka隊列讀取消息時,出現以下異常:
[error] (run-main-0) java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.Customer
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.Customer
at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.infiniteConsumer(AvroSpecificDeserializer.java:79)
at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.main(AvroSpecificDeserializer.java:87)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
卡夫卡監製代碼:
public class AvroSpecificProducer {
private static Properties kafkaProps = new Properties();
private static KafkaProducer<String, Customer> kafkaProducer;
static {
kafkaProps.put("bootstrap.servers", "localhost:9092");
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
kafkaProps.put("schema.registry.url", "http://localhost:8081");
kafkaProducer = new KafkaProducer<>(kafkaProps);
}
public static void fireAndForget(ProducerRecord<String, Customer> record) {
kafkaProducer.send(record);
}
public static void asyncSend(ProducerRecord<String, Customer> record) {
kafkaProducer.send(record, (recordMetaData, ex) -> {
System.out.println("Offset: "+ recordMetaData.offset());
System.out.println("Topic: "+ recordMetaData.topic());
System.out.println("Partition: "+ recordMetaData.partition());
System.out.println("Timestamp: "+ recordMetaData.timestamp());
});
}
public static void main(String[] args) throws InterruptedException, IOException {
Customer customer1 = new Customer(1002, "Jimmy");
ProducerRecord<String, Customer> record1 = new ProducerRecord<>("CustomerSpecificCountry",
"Customer One 11 ", customer1
);
asyncSend(record1);
Thread.sleep(1000);
}
}
卡夫卡消費者代碼:
public class AvroSpecificDeserializer {
private static Properties kafkaProps = new Properties();
static {
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "CustomerCountryGroup1");
kafkaProps.put("zookeeper.connect", "localhost:2181");
kafkaProps.put("schema.registry.url", "http://localhost:8081");
}
public static void infiniteConsumer() throws IOException {
VerifiableProperties properties = new VerifiableProperties(kafkaProps);
KafkaAvroDecoder keyDecoder = new KafkaAvroDecoder(properties);
KafkaAvroDecoder valueDecoder = new KafkaAvroDecoder(properties);
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put("NewTopic", 1);
ConsumerConnector consumer = createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(kafkaProps));
Map<String, List<KafkaStream<Object, Object>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
KafkaStream stream = consumerMap.get("NewTopic").get(0);
ConsumerIterator it = stream.iterator();
System.out.println("???????????????????????????????????????????????? ");
while (it.hasNext()) {
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ");
MessageAndMetadata messageAndMetadata = it.next();
String key = (String) messageAndMetadata.key();
GenericRecord record = (GenericRecord) messageAndMetadata.message();
Customer customer = (Customer) SpecificData.get().deepCopy(Customer.SCHEMA$, record);
System.out.println("Key: " + key);
System.out.println("Value: " + customer);
}
}
public static void main(String[] args) throws IOException {
infiniteConsumer();
}
}
我下面,這些例子S:
- https://github.com/confluentinc/examples/blob/3.1.x/kafka-clients/specific-avro-producer/src/main/java/io/confluent/examples/producer/AvroClicksProducer.java
- https://github.com/confluentinc/examples/blob/3.1.x/kafka-clients/specific-avro-consumer/src/main/java/io/confluent/examples/consumer/AvroClicksSessionizer.java
如果這樣不能解決問題,請考慮以下步驟: 1.檢查模式註冊表是否包含相應主題的模式。 2.使用'kafka-avro-console-consumer'來消耗你的事件。 這會將問題的範圍縮小到生產者或消費者。 –
嘿@Javier,我可以使用'./kafka-avro-console-consumer --bootstrap-server localhost:2181 --topic CustomerSpecificCountry --from-beginning --property schema.registry.url = http:// localhost: 8081 '執行消費者,但這個消費者沒有收到任何東西,我的命令有問題嗎? –
bootstrap-server應指向一個或多個kafka代理,而不是您的動物園管理員節點。 –