我所做的是讀取來自kafka的消息以json格式。例如。flink 1.2下沉kafka流的錯誤
{"a":1,"b":2}
然後我應用的濾波器此消息,以確保對應於a的值是1,b的值是2。最後,我想以輸出結果流至下游卡夫卡。但是,我不知道編譯器爲什麼說類型不匹配。
我的代碼如下:
val kafkaConsumer = new FlinkKafkaConsumer010(
params.getRequired("input-topic"),
new JSONDeserializationSchema(),
params.getProperties)
val messageStream = env.addSource(kafkaConsumer).rebalance
val filteredStream: DataStream[ObjectNode] = messageStream.filter(jsonNode => jsonNode.get("a").asText.equals("1")
&& jsonNode.get("b").asText.equals("2"))
filteredStream.addSink(new FlinkKafkaProducer010[Object](params.getRequired("output-topic"), new SimpleStringSchema, params.getProperties))
我指的是弗林克卡夫卡連接器文件寫卡夫卡outstream代碼: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html
我該如何提供FlinkKafkaProducer010 [ObjectNode]? – teddy
非常感謝!我的程序現在編譯。我替換你的???通過element.toString.getBytes()。但是,我無法看到我的下游卡夫卡有任何問題。我寫的東西有問題嗎? @Dawid – teddy