2016-11-30 20 views
1

使用Kafka流(版本0.10.0.1)和Kafka代理(0.10.0.1)我試圖根據消息密鑰生成計數。我生產我的郵件使用下面的命令:卡夫卡流不會在countByKey後編寫預期結果

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-streams-topic --property parse.key=true --property key.separator=, 

當我運行上面的命令我可以給這樣的鍵和值:

1,{"value":10} 

這將消息發送到卡夫卡具有關鍵= 1並且值= {「值」:10}。

我的目標是然後計算有多少消息的密鑰= 1。鑑於上面的命令則將被計爲1

這裏是我使用的代碼:

public class StreamProcessor { 

    public static void main(String[] args) { 
     KStreamBuilder builder = new KStreamBuilder(); 

     final Serde<Long> longSerde = Serdes.Long(); 
     final Serde<String> stringSerde = Serdes.String(); 

     KStream<String, String> values = builder.stream(stringSerde, stringSerde, "kafka-streams-topic"); 

     KStream<String, Long> counts = values 
       .countByKey(stringSerde, "valueCounts") 
       .toStream(); 

     counts.print(stringSerde, longSerde); 
     counts.to(stringSerde, longSerde, "message-counts-topic"); 

     KafkaStreams streams = new KafkaStreams(builder, properties()); 

     streams.start(); 

     Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); 
    } 

    private static Properties properties() { 
     final Properties streamsConfiguration = new Properties(); 

     streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-poc"); 
     streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
     streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); 
     streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 
     streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 

     return streamsConfiguration; 
    } 
} 

當我運行counts.print(stringSerde,longSerde)我得到:

1 , 1 

這意味着我有一個密鑰= 1,他們是1個有該密鑰的消息。這是我的期望。

然而,當以下行運行:

counts.to(stringSerde, longSerde, "message-counts-topic"); 

稱爲消息計數話題的話題被髮送給它的消息,但是當我嘗試使用此命令來讀取消息:

./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic message-counts-topic --property print.key=true --property key.separator=, --from-beginning 

我得到以下輸出:

1 , 

凡1是關鍵,沒有什麼是顯示該值。我期望看到消息1,1。但由於某種原因,即使計數值在調用打印方法時顯示,計數值也會丟失。

回答

4

您需要爲bin/kafka-console-consumer.sh指定一個不同的值解串器。添加以下內容:

--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer 

默認字符串反序列化器無法正確讀取長整型值。

+0

下面是完整的命令: /bin/kafka-console-consumer.sh --zookeeper本地主機:2181 --topic消息計數話題--property print.key =真--property key.separator = ,--property value.deserializer = org.apache.kafka.common.serialization.LongDeserializer --from-beginning – crypto