2016-11-02 21 views
1

我對卡夫卡有一個非常混淆的問題 - 特別試圖獲取消息的密鑰。卡夫卡消息密鑰 - 字節[]和字符串同時

的關鍵似乎認爲,這既是一個字符串和一個byte []

下面的代碼產生下面的異常:

Map<String, Integer> topicCount = new HashMap<>(); 
    topicCount.put(myConsumer.getTopic(), 1); 

    Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = myConsumer.getConsumer().createMessageStreams(topicCount); 
    List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(myConsumer.getTopic()); 
    System.out.println("Listening to topic " + myConsumer.getTopic()); 
    for (final KafkaStream stream : streams) { 
     ConsumerIterator<String, byte[]> it = stream.iterator(); 
     while (it.hasNext()) { 

      System.out.println("Message received from topic"); 

      MessageAndMetadata<String, byte[]> o = it.next(); 

      Object messageKey = o.key(); 
      System.out.println("messageKey is type: " + messageKey.getClass().getName()); 
      System.out.println("messageKey is type: " + messageKey.getClass().getCanonicalName()); 
      System.out.println("o keyDecoder: " + o.keyDecoder()); 

      System.out.println("Key from message: " + o.key()); //This throws exception - [B cannot be cast to java.lang.String 
      //System.out.println("Key as String: " + new String(o.key(), StandardCharsets.UTF_8)); //uncomment this compile Exception - no suitable constructor found for String(java.lang.String,java.nio.charset.Charset) 

      byte[] bytesIn = o.message();  //getting the bytes is fine 

      System.out.println("MessageAndMetadata: " + o); 

      ///other code cut 
     } 
    } 

例外:

Listening to topic MyKafkaTopic 
Message received from topic 
messageKey is type: [B 
messageKey is type: byte[] 
o decoder: [email protected] 
[WARNING] 
java.lang.reflect.InvocationTargetException 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.String 
    at com.foo.bar.KafkaCFS.process(KafkaCFS.java:153) 
    at com.foo.bar.KafkaCFS.run(KafkaCFS.java:63) 
    at com.foo.bar.App.main(App.java:90) 
    ... 6 more  

Maven的:

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.10</artifactId> 
    <version>0.9.0.1</version> 
</dependency> 

如果我取消註釋的System.out行,然後我甚至無法編譯:

[ERROR] COMPILATION ERROR : 
[INFO] ------------------------------------------------------------- 
[ERROR] /C:/Dev/main/java/com/foo/bar/KafkaCFS.java:[152,56] no suitable constructor found for String(java.lang.String,java.nio.charset.Charset) 
    constructor java.lang.String.String(byte[],int) is not applicable 
    (argument mismatch; java.lang.String cannot be converted to byte[]) 

它是如何編譯器認爲關鍵是一個字符串(這是我希望它是),但運行時,它是一個字節陣列?

我能做些什麼來獲得鑰匙作爲一個字符串?

謝謝,

KA。

回答

2

這並不符合!您聲明的流爲KafkaStream<byte[], byte[]>,然後您希望ConsumerIterator<String, byte[]> it = stream.iterator();它應該是ConsumerIterator<byte[], byte[]> it = stream.iterator();以匹配泛型。然後你可以得到o.key()並通過它創建一個字符串new String(o.key());

1

更好的設置KafkaStream通用參數類型是(byte [],byte [])。嘗試更改這樣的代碼:

ConsumerIterator<byte[], byte[]> it = stream.iterator(); 
while (it.hasNext()) { 
    String key = new String(it.next().key()); 
    ... 
}