2017-08-08 54 views
0

我所做的是讀取來自kafka的消息以json格式。例如。flink 1.2下沉kafka流的錯誤

{"a":1,"b":2} 

然後我應用的濾波器此消息,以確保對應於a的值是1,b的值是2。最後,我想以輸出結果流至下游卡夫卡。但是,我不知道編譯器爲什麼說類型不匹配。

我的代碼如下:

val kafkaConsumer = new FlinkKafkaConsumer010(
params.getRequired("input-topic"), 
new JSONDeserializationSchema(), 
params.getProperties) 

val messageStream = env.addSource(kafkaConsumer).rebalance 
val filteredStream: DataStream[ObjectNode] = messageStream.filter(jsonNode => jsonNode.get("a").asText.equals("1") 
         && jsonNode.get("b").asText.equals("2")) 

filteredStream.addSink(new FlinkKafkaProducer010[Object](params.getRequired("output-topic"), new SimpleStringSchema, params.getProperties)) 

錯誤我被示出在下面的圖片: enter image description here

我指的是弗林克卡夫卡連接器文件寫卡夫卡outstream代碼: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html

回答

1

你有一個流DataStream類型ObjectNode,所以你需要提供FlinkKafkaProducer010[ObjectNode]例如:

stream1.addSink(new FlinkKafkaProducer010[ObjectNode](params.getRequired("output-topic"), new SerializationSchema[ObjectNode] { 
    override def serialize(element: ObjectNode): Array[Byte] = ??? 
}), params.getProperties) 

所有的通用類型的Java在類型不變,這就是爲什麼你不能僅僅通過FlinkKafkaProducer010[Object]

您可能會遇到的另一個問題是您還需要提供SerializationSchema[ObjectNode]SimpleStringSchema實施SerializationSchema[String]

+0

我該如何提供FlinkKafkaProducer010 [ObjectNode]? – teddy

+0

非常感謝!我的程序現在編譯。我替換你的???通過element.toString.getBytes()。但是,我無法看到我的下游卡夫卡有任何問題。我寫的東西有問題嗎? @Dawid – teddy

1

添加到什麼@Dawid已經指出的那樣,你可以爲ObjectNode提供的順序化模式(假設它是一個POJO,因爲我還沒有對其他對象進行了測試)如下:

TypeInformation<ObjectNode> typeInfo = 
     TypeInformation.of(new TypeHint<ObjectNode>() {}); 
TypeInformationSerializationSchema<ObjectNode> serdeSchema = 
     new TypeInformationSerializationSchema<>(typeInfo, env.getConfig()); 

和然後用serdeschema爲KafkaPrducer沉如下:

FlinkKafkaProducer010<RecordReadEventType> kafkaSink = 
       new FlinkKafkaProducer010<>(
           BOOTSTRAP_SERVERS, 
           "output-topic", 
           serdeSchema); 

希望,這將解決您的問題,卡夫卡水槽衝突。

+0

我解決了衝突(至少我的程序運行)。但是沒有輸出流。我覺得如果我在Kafka消費者中使用JSONDeserializationSchema(在閱讀kafka消息時),我的程序無法讀取任何內容。我發佈了一個問題,如果你能幫助我:https://stackoverflow.com/questions/45564373/cannot-see-message-while-sinking-kafka-stream-and-cannot-see-print-message-in- FL – teddy