0

我學習卡夫卡流和我有一個字計數的在Java中8的第一個例子,從文檔拍攝的問題。卡夫卡流 - 第一個例子字計數不正確計算第一圈

使用卡夫卡的最新可用版本流,卡夫卡Connect和字計數lambda表達式的例子。

我遵循以下步驟: 我創建卡夫卡輸入話題,輸出一個。啓動應用程序流,然後通過從.txt文件

在第一計數插入一些的話,輸出話題我看到正確的分組詞語的上傳輸入話題,但數是錯誤的。如果我試圖重新插入相同的單詞,那麼以前錯誤計數的連續計數都是正確的。

如果我尋找與消費者控制檯輸入話題轉儲,它的正確加載,並且不存在髒數據。

第一次算錯了怎麼回事?

實施例[第一DATA]: (卡夫卡輸入主題) 喜喜 麥克風麥克風 測試

(應用程序流式傳輸正在運行)

(輸出主題)。在12麥克4試驗3(隨便計數)

[連續數據 - 發佈輸入主題中的相同的話]

(輸出主題)。在14麥克風6試驗4

[新的嘗試]

(輸出主題)。在16麥克風8試驗5

等....

+0

聽起來很奇怪。你能否可靠地再現這個問題?這不應該發生。 –

回答

3

在阿帕奇卡夫卡的字計數演示具有the following lines

// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data 
// Note: To re-run the demo, you need to use the offset reset tool: 
// https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool 
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 

這意味着,當您重新啓動應用程序時,它將從最開始(「最早」)開始讀取其輸入主題iff there is no existi存儲在Kafka中的WordCount應用的消費者偏移量。一定程度的應用不活動後,應用的消費者抵消將在卡夫卡過期,默認值爲24小時(參見offsets.retention.minutesbroker configuration)。

我能想象得到以下事情發生了:

  • 你嘗試了卡夫卡一段時間較早,進入測試數據輸入話題。
  • 然後,在恢復實驗之前,您先進行了> 24小時的休息。
  • 現在的應用程序,當它重新啓動,恢復了從一開始就一路重新讀取輸入的話題,從而拾起舊的測試輸入數據,從而導致「膨脹」計數。

如果我查看使用者控制檯的輸入主題轉儲,它正確加載並且沒有髒數據。

您可以通過控制檯消費者再次在輸入題目看,同時增加了CLI選項--from-beginning(見https://kafka.apache.org/documentation/#quickstart_consume)證實了以上我的假設。

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yourInputTopic --from-beginning 

這將顯示主題「yourInputTopic」中的所有可用數據 - 減去可能已在此期間,卡夫卡主題清除所有數據(默認代理配置將清除比舊數據7天,參見)。

+0

謝謝你的回答。實際上,當我在24小時後進行測試(然後是新的偏移量)時,我正在刪除舊的主題(我啓用了取消)並從頭開始重新創建它們以實現新的乾淨執行。問題再次出現。但是現在我已經在示例代碼中添加了linesConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,「最早的」)行爲,並且似乎運行良好。也許我還沒有解決這個問題,但它的工作原理。 –

+0

太棒了,很高興聽到它現在起作用! –

+0

幾周前我有類似的問題,但有時計數是負數。這可能是由類似的東西造成的嗎? – foxygen