2017-08-30 45 views
0

我開始嘗試卡夫卡數據流。我遵循https://kafka.apache.org/0110/documentation/streams/quickstart卡夫卡字數未更新計數

我的沙盒是一個運行Ubuntu 16.04.2 LTS,Kafka 0.11.0.0和Scala 2.11.11的盒子。

作爲卡夫卡流快速啓動指南中說明,這裏有我跟着步驟:

echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt 

bin/kafka-topics.sh --create \ 
    --zookeeper localhost:2181 \ 
    --replication-factor 1 \ 
    --partitions 1 \ 
    --topic streams-file-input 

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt 

bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo 

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ 
    --topic streams-wordcount-output \ 
    --from-beginning \ 
    --formatter kafka.tools.DefaultMessageFormatter \ 
    --property print.key=true \ 
    --property print.value=true \ 
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ 
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer 

當在流-單詞計數輸出通過使用後者命令來看,我的標準輸出顯示以下內容:

all 1 
streams 1 
lead 1 
to 1 
kafka 1 
hello 1 
kafka 2 
streams 2 
join 1 
kafka 3 
summit 1 

然後,在不中斷的bin/kafka-console-consumer.sh命令,我重新運行控制檯製片如下:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt 

我很驚訝標準輸出不會改變,以反映這個新增加的導致的變化。在我的理解中,file-input.txt被用來產生額外的數據,所以字數應該刷新(所有的令牌現在應該被計數兩次)。 我的推理有什麼問題?

+0

當然,'bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo'在整個過程中仍在運行?只是爲了仔細檢查,你還應該在'streams-file-input'主題上運行一個消費者,以確保你真的在那裏添加新的值... –

+0

哦,哦......我沒有注意到WordCountDemo沒有運行了。再次運行它的輸出看起來是正確的。謝謝 !但是,在5秒之後,bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo停止。根據我的理解,這是假設永遠運行。我錯過了什麼? – SCO

回答