2013-06-20 102 views
1

我試着寫一些簡單的生產者寫入消息卡夫卡 我已經下載kafka0.7「ClassCastException異常:kafka.message.Message不能轉換爲java.lang.String」卡夫卡生產者

運行卡夫卡服務器,生產者和消費者

> bin/kafka-server-start.sh config/server.properties 
> bin/kafka-console-consumer.sh --zookeeper zooserver:2181 --topic test --from-beginning 
> bin/kafka-console-producer.sh --zookeeper zooserver:2181 --topic test 

而且一切正常!

然後我創建了簡單的消費者,它也有效。

SimpleConsumer consumer = new SimpleConsumer("172.21.110.134", 9092, 10000, 1024000); 
… 
ByteBufferMessageSet messages = consumer.fetch(fetchRequest); 
System.out.println("consumed: " + Utils.toString(msg.message().payload(), "UTF-8")); 

但接下來的生產者總是拋出錯誤

ProducerConfig config = new ProducerConfig(props); 
… 
Producer<String, Message> producer = new Producer<String, Message>(config); 
producer.send(new ProducerData<String, Message>("test", new Message("Hello World".getBytes()))); 

收到以下異常

Exception in thread "main" java.lang.ClassCastException: kafka.message.Message cannot be cast to java.lang.String 
      at kafka.serializer.StringEncoder.toMessage(Encoder.scala:30) 
      at kafka.producer.ProducerPool$$anonfun$send$1$$anonfun$3$$anonfun$apply$1.apply(ProducerPool.scala:107) 

我用Java代碼的Maven依賴

<dependency> 
     <groupId>com.twitter</groupId> 
     <artifactId>kafka_2.9.2</artifactId> 
     <version>0.7.0</version> 
    </dependency> 

    <dependency> 
     <groupId>org.scala-lang</groupId> 
     <artifactId>scala-library</artifactId> 
     <version>2.9.2</version> 
    </dependency> 

P租賃幫助,我做錯了什麼?爲什麼我不能通過簡單的java代碼寫入?

回答

9

的問題是,你已經配置了錯誤的序列化,這應該是

props.put("serializer.class", "kafka.serializer.DefaultEncoder"); 

不是「StringEncoder」

+0

哇感謝,這麼小的事! – Julias

相關問題