2017-08-08 44 views
0

我的目標是使用kafka讀取json格式的字符串,對字符串進行過濾,然後將消息下沉(仍在json字符串中格式)。在下沉kafka流時看不到消息,無法在flink 1.2中看到打印消息

出於測試目的,我的輸入字符串信息是這樣的:

{"a":1,"b":2} 

而且我實現的代碼是:

def main(args: Array[String]): Unit = { 

// parse input arguments 
val params = ParameterTool.fromArgs(args) 

if (params.getNumberOfParameters < 4) { 
    println("Missing parameters!\n" 
    + "Usage: Kafka --input-topic <topic> --output-topic <topic> " 
    + "--bootstrap.servers <kafka brokers> " 
    + "--zookeeper.connect <zk quorum> --group.id <some id> [--prefix <prefix>]") 
    return 
} 

val env = StreamExecutionEnvironment.getExecutionEnvironment 
env.getConfig.disableSysoutLogging 
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)) 
// create a checkpoint every 5 seconds 
env.enableCheckpointing(5000) 
// make parameters available in the web interface 
env.getConfig.setGlobalJobParameters(params) 

// create a Kafka streaming source consumer for Kafka 0.10.x 
val kafkaConsumer = new FlinkKafkaConsumer010(
    params.getRequired("input-topic"), 
    new JSONKeyValueDeserializationSchema(false), 
    params.getProperties) 

val messageStream = env.addSource(kafkaConsumer) 

val filteredStream: DataStream[ObjectNode] = messageStream.filter(node => node.get("a").asText.equals("1") 
         && node.get("b").asText.equals("2")) 

messageStream.print() 
// Refer to: https://stackoverflow.com/documentation/apache-flink/9004/how-to-define-a-custom-deserialization-schema#t=201708080802319255857 
filteredStream.addSink(new FlinkKafkaProducer010[ObjectNode](
    params.getRequired("output-topic"), 
    new SerializationSchema[ObjectNode] { 
    override def serialize(element: ObjectNode): Array[Byte] = element.toString.getBytes() 
    }, params.getProperties 
)) 

env.execute("Kafka 0.10 Example") 
} 

可以看出,我想打印信息流控制檯並將過濾的消息下載到kafka。但是,我看不到他們。

有趣的是,如果我將KafkaConsumer的模式從JSONKeyValueDeserializationSchema修改爲SimpleStringSchema,我可以看到messageStream打印到控制檯。代碼如下所示:

val kafkaConsumer = new FlinkKafkaConsumer010(
    params.getRequired("input-topic"), 
    new SimpleStringSchema, 
    params.getProperties) 

val messageStream = env.addSource(kafkaConsumer) 
messageStream.print() 

這讓我覺得,如果我用JSONKeyValueDeserializationSchema,我輸入消息實際上是不被接受的卡夫卡。但這似乎很奇怪,並且與在線文檔(https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html

有很大區別希望有人能幫助我!

回答

1

JSONKeyValueDeserializationSchema()需要每個kafka msg的消息密鑰,我假設在通過kafka主題生成併發送JSON消息時沒有提供密鑰。

因此,要解決此問題,請嘗試使用JSONDeserializationSchema(),它僅根據消息接收消息並根據收到的消息創建對象節點。

+0

我其實曾試過。我的程序將在大約30秒內無問題地啓動,但仍然沒有輸出(控制檯和卡夫卡)。然後,我會得到一個錯誤:線程「main」中的異常org.apache.flink.runtime.client.JobExecutionException:作業執行失敗。引起:com.fasterxml.jackson.core.JsonParseException:無法識別的標記'PREFIX':期待('true','false'或'null') at [Source:[B @ 68d8025;行:1,列:8] – teddy

+0

你確定你喂的JSON是正確的嗎?因爲我用示例代碼運行了示例,它在我的最後工作。你可以檢查你的JSON的有效性:https://jsonlint.com/ –

+0

是的,{「a」:1,「b」:2}絕對是一個有效的json(我也檢查過)。我想知道你如何測試我的代碼?我所做的就是使用當地的卡夫卡消費者進行投入,當地的卡夫卡生產者進行輸出。我無法看到flink程序的輸出。 – teddy