我們計劃使用Flink處理來自kafka主題的數據流(Logs in Json格式)。 但是,對於那個處理,我們需要使用每天都在變化的輸入文件,而內部的信息可以完全改變(不是格式,而是內容)。 每當其中一個輸入文件發生更改時,我們將不得不將這些文件重新加載到程序中並保持流處理正在進行。數據 重新加載可以做同樣的方式,因爲它現在已經完成: DataSet<String> globalData = e
我想提取由FlinkKafkaConsumer010生成的消息的時間戳作爲數據流中的值。 我知道AssignerWithPeriodicWatermarks類,但這似乎只是通過DataStream API爲時間聚合的目的提取時間戳。 我想在後面的Table中提供該卡夫卡消息時間戳,我可以在其上使用SQL。 編輯:嘗試這樣: val consumer = new FlinkKafkaConsumer