我對卡夫卡有一個非常混淆的問題 - 特別試圖獲取消息的密鑰。卡夫卡消息密鑰 - 字節[]和字符串同時
的關鍵似乎認爲,這既是一個字符串和一個byte []
下面的代碼產生下面的異常:
Map<String, Integer> topicCount = new HashMap<>();
topicCount.put(myConsumer.getTopic(), 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = myConsumer.getConsumer().createMessageStreams(topicCount);
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(myConsumer.getTopic());
System.out.println("Listening to topic " + myConsumer.getTopic());
for (final KafkaStream stream : streams) {
ConsumerIterator<String, byte[]> it = stream.iterator();
while (it.hasNext()) {
System.out.println("Message received from topic");
MessageAndMetadata<String, byte[]> o = it.next();
Object messageKey = o.key();
System.out.println("messageKey is type: " + messageKey.getClass().getName());
System.out.println("messageKey is type: " + messageKey.getClass().getCanonicalName());
System.out.println("o keyDecoder: " + o.keyDecoder());
System.out.println("Key from message: " + o.key()); //This throws exception - [B cannot be cast to java.lang.String
//System.out.println("Key as String: " + new String(o.key(), StandardCharsets.UTF_8)); //uncomment this compile Exception - no suitable constructor found for String(java.lang.String,java.nio.charset.Charset)
byte[] bytesIn = o.message(); //getting the bytes is fine
System.out.println("MessageAndMetadata: " + o);
///other code cut
}
}
例外:
Listening to topic MyKafkaTopic
Message received from topic
messageKey is type: [B
messageKey is type: byte[]
o decoder: [email protected]
[WARNING]
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.String
at com.foo.bar.KafkaCFS.process(KafkaCFS.java:153)
at com.foo.bar.KafkaCFS.run(KafkaCFS.java:63)
at com.foo.bar.App.main(App.java:90)
... 6 more
Maven的:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.9.0.1</version>
</dependency>
如果我取消註釋的System.out行,然後我甚至無法編譯:
[ERROR] COMPILATION ERROR :
[INFO] -------------------------------------------------------------
[ERROR] /C:/Dev/main/java/com/foo/bar/KafkaCFS.java:[152,56] no suitable constructor found for String(java.lang.String,java.nio.charset.Charset)
constructor java.lang.String.String(byte[],int) is not applicable
(argument mismatch; java.lang.String cannot be converted to byte[])
它是如何編譯器認爲關鍵是一個字符串(這是我希望它是),但運行時,它是一個字節陣列?
我能做些什麼來獲得鑰匙作爲一個字符串?
謝謝,
KA。