我所做的是讀取來自kafka的消息以json格式。例如。 {"a":1,"b":2}
然後我應用的濾波器此消息,以確保對應於a的值是1,b的值是2。最後,我想以輸出結果流至下游卡夫卡。但是,我不知道編譯器爲什麼說類型不匹配。 我的代碼如下: val kafkaConsumer = new FlinkKafkaConsumer010(
params.getRequired("input-topi
我的目標是使用kafka以json格式讀取字符串,對字符串進行過濾,選擇部分消息並下沉消息(仍以json字符串格式)。 出於測試目的,我的輸入字符串信息是這樣的: {"a":1,"b":2,"c":"3"}
而且我實現的代碼是: def main(args: Array[String]): Unit = {
val inputProperties = new Properties()
i