我正在使用FluentD(v.12上一個穩定版本)向Kafka發送消息。但FluentD使用的是舊的KafkaProducer,因此記錄時間戳記始終設置爲-1。 因此,我必須使用WallclockTimestampExtractor將記錄的時間戳設置爲時間點,當郵件到達kafka時。Kafka Streams:如何更改記錄時間戳(0.11.0)?
時間戳我在真的有興趣,由fluentd消息中發送:
「時間戳」: 「1507885936」, 「主人」: 「V.X.Y.Z」在卡夫卡
記錄表示:
偏移= 0,時間戳= - 1,鍵= NULL,值= { 「時間戳」: 「1507885936」, 「宿主」: 「VXYZ」}
我想在卡夫卡這樣的記錄:
偏移= 0,時間戳= 1507885936,鍵= NULL,值= { 「時間戳」: 「1507885936」, 「宿主」: 「VXYZ」}
我的解決方法是這樣的: - 寫一個消費者來提取時間戳(https://kafka.apache.org/0110/javadoc/org/apache/kafka/streams/processor/TimestampExtractor.html)
- 寫一個生產者生產具有時間戳的新紀錄(ProducerRecord(字符串話題,整數分區,長時間戳,K鍵,V值)
我更喜歡KafkaStreams解決方案,如果有的話。
不能跟隨你的問題。你想達到什麼目的? –
謝謝,@ MatthiasJ.Sax! 我編輯了這個問題,並希望我的請求現在更清晰 – sunjazz