-1
我正在類轉換異常而消息經過卡夫卡流API,類轉換異常而消息經過卡夫卡流API
例外是:
java.lank/ClassCastException異常:[B不能轉換爲com.fasterxml.jackson.databind.JsonNode
我流的代碼是:
public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "BranchingTopics-API");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"samsmembershipkafka.dev.cloud.wal-mart.com:9092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
final Serde<String> stringSerde = Serdes.String();
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer,
jsonDeserializer);
/*
* config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, jsonSerde);
* config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
* jsonSerde);
*/
// Building Stream
KStreamBuilder builder = new KStreamBuilder();
KStream<String, JsonNode> textlines = builder.stream("MainTopic");
Predicate<String, JsonNode> isAddComment = (key, value) -> value
.get("header").toString().contains("/addComment");
Predicate<String, JsonNode> is1M1C = (key, value) -> value
.get("header").toString().contains("/1Member1Account");
Predicate<String, JsonNode> isLostOrStolen = (key, value) -> (value
.get("header").toString()
.contains("/changeCardStatus?action=STOLEN") || value
.get("header").toString()
.contains("/changeCardStatus?action=LOST"));
KStream<String, JsonNode>[] topicTypes = textlines.branch(isAddComment,
is1M1C, isLostOrStolen);
topicTypes[0].to(stringSerde, jsonSerde, "CommentsTopic");
topicTypes[1].to(stringSerde, jsonSerde, "OneMemberOneAccountTopic");
topicTypes[2].to(stringSerde, jsonSerde, "LostOrStolenTopic");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
}
什麼是完整的堆棧跟蹤?似乎你在某個地方使用了錯誤的Serde。從你的代碼片段中,我會假設你在閱讀主題'builder.stream(stringSerde,jsonSerde,「MainTopic」)時需要指定JSON-Serde;' - 我想你需要爲其他操作指定正確的serdes 。仔細查看堆棧跟蹤,找出哪個操作員拋出異常。 –
是的,這是問題,現在工作,謝謝 – user8677554
把我的評論作爲答案,所以你可以標記問題爲答案。 –