2015-11-30 45 views
1

所以我想實現發送通知kafka生產者到kafka消費者的簡單應用程序。到目前爲止,我已經成功發送String消息到生產者consumer.But當我嘗試發送通知對象kafka消費者沒有收到任何對象。這是我使用的代碼。卡夫卡消費者無法收到序列化對象?

public class Notification implements Serializable{ 

    private String name; 
    private String message; 
    private long currentTimeStamp; 

    public String getName() { 
     return name; 
    } 

    public void setName(String name) { 
     this.name = name; 
    } 

    public String getMessage() { 
     return message; 
    } 

    public void setMessage(String message) { 
     this.message = message; 
    } 

    public long getCurrentTimeStamp() { 
     return currentTimeStamp; 
    } 

    public void setCurrentTimeStamp(long currentTimeStamp) { 
     this.currentTimeStamp = currentTimeStamp; 
    } 

    @Override 
    public boolean equals(Object o) { 
     if (this == o) return true; 
     if (o == null || getClass() != o.getClass()) return false; 

     Notification that = (Notification) o; 

     if (currentTimeStamp != that.currentTimeStamp) return false; 
     if (message != null ? !message.equals(that.message) : that.message != null) return false; 
     if (name != null ? !name.equals(that.name) : that.name != null) return false; 

     return true; 
    } 

    @Override 
    public int hashCode() { 
     int result = name != null ? name.hashCode() : 0; 
     result = 31 * result + (message != null ? message.hashCode() : 0); 
     result = 31 * result + (int) (currentTimeStamp^(currentTimeStamp >>> 32)); 
     return result; 
    } 

    @Override 
    public String toString() { 
     return "Notification{" + 
       "name='" + name + '\'' + 
       ", message='" + message + '\'' + 
       ", currentTimeStamp=" + currentTimeStamp + 
       '}'; 
    } 
} 

這是製片人

public class KafkaProducer { 
    static String topic = "kafka-tutorial"; 


    public static void main(String[] args) { 
     System.out.println("Start Kafka producer"); 
     Properties properties = new Properties(); 
     properties.put("metadata.broker.list", "localhost:9092"); 
     properties.put("serializer.class", "dev.innova.kafka.tutorial.producer.CustomSerializer"); 
     ProducerConfig producerConfig = new ProducerConfig(properties); 
     kafka.javaapi.producer.Producer<String, Notification> producer = new kafka.javaapi.producer.Producer<String, Notification>(producerConfig); 
     KeyedMessage<String, Notification> message = new KeyedMessage<String, Notification>(topic, createNotification()); 
     System.out.println("send Message to broker"); 
     producer.send(message); 
     producer.close(); 

    } 

    private static Notification createNotification(){ 
     Notification notification = new Notification(); 
     notification.setMessage("Sample Message"); 
     notification.setName("Sajith"); 
     notification.setCurrentTimeStamp(System.currentTimeMillis()); 
     return notification; 
    } 
} 

這是消費者

public class KafkaConcumer extends Thread { 
    final static String clientId = "SimpleConsumerDemoClient"; 
    final static String TOPIC = "kafka-tutorial"; 
    ConsumerConnector consumerConnector; 


    public KafkaConcumer() { 
     Properties properties = new Properties(); 
     properties.put("zookeeper.connect","localhost:2181"); 
     properties.put("group.id","test-group"); 
     properties.put("serializer.class", "dev.innova.kafka.tutorial.producer.CustomSerializer"); 
     properties.put("zookeeper.session.timeout.ms", "400"); 
     properties.put("zookeeper.sync.time.ms", "200"); 
     properties.put("auto.commit.interval.ms", "1000"); 
     ConsumerConfig consumerConfig = new ConsumerConfig(properties); 
     consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); 
    } 

    @Override 
    public void run() { 
     Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
     topicCountMap.put(TOPIC, new Integer(1)); 
     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); 
     KafkaStream<byte[], byte[]> stream = consumerMap.get(TOPIC).get(0); 
     ConsumerIterator<byte[], byte[]> it = stream.iterator(); 
     System.out.println("It :" + it.size()); 
     while(it.hasNext()){ 
      System.out.println(new String(it.next().message())); 
     } 
    } 

    private static void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException { 
     for(MessageAndOffset messageAndOffset: messageSet) { 
      ByteBuffer payload = messageAndOffset.message().payload(); 
      byte[] bytes = new byte[payload.limit()]; 
      payload.get(bytes); 
      System.out.println(new String(bytes, "UTF-8")); 
     } 
    } 
} 

最後我用customserializer序列化和反序列化對象。

public class CustomSerializer implements Encoder<Notification>, Decoder<Notification> { 
    public CustomSerializer(VerifiableProperties verifiableProperties) { 
     /* This constructor must be present for successful compile. */ 
    } 
    @Override 
    public byte[] toBytes(Notification o) { 
     return new byte[0]; 
    } 

    @Override 
    public Notification fromBytes(byte[] bytes) { 
     return null; 
    } 
} 

有人能告訴我什麼是問題?這是正確的方式嗎?

回答

3

你有兩個問題。

首先,您的反序列化器沒有任何邏輯。它爲序列化的每個對象返回一個空字節數組,並且每當它被請求反序列化一個對象時,它都會返回一個空對象。你需要在那裏放置實際序列化和反序列化你的對象的代碼。其次,如果您打算使用JVM中的本機JVM序列化和反序列化邏輯,則需要將serialVersionUID添加到將要傳輸的bean中。像這樣:

private static final long serialVersionUID = 123L; 

您可以使用任何你喜歡的值。當JVM將對象反序列化時,對象中的serialVersionId將與加載的類定義中指定的值進行比較。如果兩者不同,那麼JVM假定即使加載了類定義,也沒有加載類定義的正確版本,並且序列化將失敗。如果您沒有在類定義中爲serialVersionID指定一個值,那麼JVM將爲您創建一個,並且兩個不同的JVM(具有生產者和消費者的JVM)幾乎肯定會爲您創建不同的值。

編輯

你需要讓你的序列化是這個樣子,如果你想利用默認的Java序列化:

public class CustomSerializer implements Encoder<Notification>, Decoder<Notification> { 
    public CustomSerializer(VerifiableProperties verifiableProperties) { 
     /* This constructor must be present for successful compile. */ 
    } 

@Override 
public byte[] toBytes(Notification o) { 
    try { 
     ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
     ObjectOutputStream oos = new ObjectOutputStream(baos); 
     oos.writeObject(o); 
     oos.close(); 
     byte[] b = baos.toByteArray(); 
     return b; 
    } catch (IOException e) { 
     return new byte[0]; 
    } 
} 

@Override 
public Notification fromBytes(byte[] bytes) { 
    try { 
     return (Notification) new ObjectInputStream(new ByteArrayInputStream(b)).readObject(); 
    } catch (Exception e) { 
     return null; 
    }  
} 
+0

感謝克里斯我已經改變你告訴me.It沒路這無助於解決問題。您可以具體告訴我我需要串行器邏輯。 – Sajithv

+0

@ Sajith4U增加了一個例子 –

+0

感謝你的例子,它真的幫助我。 – Sajithv