3

我正在使用FluentD(v.12上一個穩定版本)向Kafka發送消息。但FluentD使用的是舊的KafkaProducer,因此記錄時間戳記始終設置爲-1。 因此,我必須使用WallclockTimestampExtractor將記錄的時間戳設置爲時間點,當郵件到達kafka時。Kafka Streams:如何更改記錄時間戳(0.11.0)?

時間戳我在真的有興趣,由fluentd消息中發送:

「時間戳」: 「1507885936」, 「主人」: 「V.X.Y.Z」在卡夫卡

記錄表示:

偏移= 0,時間戳= - 1,鍵= NULL,值= { 「時間戳」: 「1507885936」, 「宿主」: 「VXYZ」}

我想在卡夫卡這樣的記錄:

偏移= 0,時間戳= 1507885936,鍵= NULL,值= { 「時間戳」: 「1507885936」, 「宿主」: 「VXYZ」}

我的解決方法是這樣的: - 寫一個消費者來提取時間戳(https://kafka.apache.org/0110/javadoc/org/apache/kafka/streams/processor/TimestampExtractor.html

  • 寫一個生產者生產具有時間戳的新紀錄(ProducerRecord(字符串話題,整數分區,長時間戳,K鍵,V值)

我更喜歡KafkaStreams解決方案,如果有的話。

+0

不能跟隨你的問題。你想達到什麼目的? –

+0

謝謝,@ MatthiasJ.Sax! 我編輯了這個問題,並希望我的請求現在更清晰 – sunjazz

回答

5

你可以寫一個很簡單的卡夫卡流應用,如:

KStreamBuilder builder = new KStreamBuilder(); 
builder.stream("input-topic").to("output-topic"); 

,並用自定義TimestampExtractor從記錄中提取的時間戳,並返回其配置的應用程序。

當將記錄寫回到Kafka時,Kafka Streams將使用返回的時間戳。

注意:如果你有無序數據的 - 也就是說,時間戳不嚴格有序的 - 結果將包含亂序時戳了。 Kafka Streams使用返回的時間戳寫回Kafka(即,無論提取器返回什麼,都用作記錄元數據時間戳)。請注意,在寫入時,當前處理的輸入記錄的時間戳用於所有生成的輸出記錄 - 這適用於版本1.0,但在將來的版本中可能會更改)。

+0

這是我一直在尋找的句子:「在將記錄寫回Kafka時,Kafka Streams將使用返回的時間戳。」音樂我的耳朵。謝謝! –

+0

剛剛更新了這個問題:「注意」並不完全正確... –

+0

聽起來不錯,感謝您的更新。我只是試圖找出一個遷移,並且我出汗一會兒不知道是否可以控制輸出時間戳(以便稍後可以與其他主題結合使用) –