2017-09-26 47 views
-1

我正在類轉換異常而消息經過卡夫卡流API,類轉換異常而消息經過卡夫卡流API

例外是:

java.lank/ClassCastException異常:[B不能轉換爲com.fasterxml.jackson.databind.JsonNode

我流的代碼是:

public static void main(String[] args) { 
    Properties config = new Properties(); 
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "BranchingTopics-API"); 
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
      "samsmembershipkafka.dev.cloud.wal-mart.com:9092"); 
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); 
    final Serde<String> stringSerde = Serdes.String(); 
    final Serializer<JsonNode> jsonSerializer = new JsonSerializer(); 
    final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer(); 
    final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, 
      jsonDeserializer); 

    /* 
    * config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, jsonSerde); 
    * config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
    * jsonSerde); 
    */ 

    // Building Stream 
    KStreamBuilder builder = new KStreamBuilder(); 

    KStream<String, JsonNode> textlines = builder.stream("MainTopic"); 

    Predicate<String, JsonNode> isAddComment = (key, value) -> value 
      .get("header").toString().contains("/addComment"); 
    Predicate<String, JsonNode> is1M1C = (key, value) -> value 
      .get("header").toString().contains("/1Member1Account"); 
    Predicate<String, JsonNode> isLostOrStolen = (key, value) -> (value 
      .get("header").toString() 
      .contains("/changeCardStatus?action=STOLEN") || value 
      .get("header").toString() 
      .contains("/changeCardStatus?action=LOST")); 

    KStream<String, JsonNode>[] topicTypes = textlines.branch(isAddComment, 
      is1M1C, isLostOrStolen); 

    topicTypes[0].to(stringSerde, jsonSerde, "CommentsTopic"); 
    topicTypes[1].to(stringSerde, jsonSerde, "OneMemberOneAccountTopic"); 
    topicTypes[2].to(stringSerde, jsonSerde, "LostOrStolenTopic"); 

    KafkaStreams streams = new KafkaStreams(builder, config); 

    streams.start(); 
} 
+0

什麼是完整的堆棧跟蹤?似乎你在某個地方使用了錯誤的Serde。從你的代碼片段中,我會假設你在閱讀主題'builder.stream(stringSerde,jsonSerde,「MainTopic」)時需要指定JSON-Serde;' - 我想你需要爲其他操作指定正確的serdes 。仔細查看堆棧跟蹤,找出哪個操作員拋出異常。 –

+1

是的,這是問題,現在工作,謝謝 – user8677554

+0

把我的評論作爲答案,所以你可以標記問題爲答案。 –

回答

0

似乎你在某個地方使用了錯誤的Serde。從你的代碼片段,我會假設你閱讀的話題時,需要指定JSON-SERDE:

builder.stream(stringSerde, jsonSerde, "MainTopic"); 

我猜你會需要進行其他操作指定正確的Serde S,太。仔細查看堆棧跟蹤,找出哪個操作員拋出異常

相關問題