2017-08-09 37 views
0

我的目標是使用kafka以json格式讀取字符串,對字符串進行過濾,選擇部分消息並下沉消息(仍以json字符串格式)。如何在Flink 1.2中從Kafka中提取部分json格式的字符串

出於測試目的,我的輸入字符串信息是這樣的:

{"a":1,"b":2,"c":"3"} 

而且我實現的代碼是:

def main(args: Array[String]): Unit = { 

val inputProperties = new Properties() 
inputProperties.setProperty("bootstrap.servers", "localhost:9092") 
inputProperties.setProperty("group.id", "myTest2") 
val inputTopic = "test" 

val outputProperties = new Properties() 
outputProperties.setProperty("bootstrap.servers", "localhost:9092") 
val outputTopic = "test2" 


val env = StreamExecutionEnvironment.getExecutionEnvironment 
env.getConfig.disableSysoutLogging 
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)) 
// create a checkpoint every 5 seconds 
env.enableCheckpointing(5000) 

// create a Kafka streaming source consumer for Kafka 0.10.x 
val kafkaConsumer = new FlinkKafkaConsumer010(
    inputTopic, 
    new JSONDeserializationSchema(), 
    inputProperties) 

val messageStream : DataStream[ObjectNode]= env 
    .addSource(kafkaConsumer).rebalance 

val filteredStream: DataStream[ObjectNode] = messageStream.filter(node => node.get("a") 
    .asText.equals("1") && node.get("b").asText.equals("2")) 

// Need help in this part, how to extract for instance a,c and 
// get something like {"a":"1", "c":"3"}? 
val testStream:DataStream[JsonNode] = filteredStream.map(
    node => { 
    node.get("a") 
    } 
) 

testStream.addSink(new FlinkKafkaProducer010[JsonNode](
    outputTopic, 
    new SerializationSchema[JsonNode] { 
    override def serialize(element: JsonNode): Array[Byte] = element.toString.getBytes() 
    }, outputProperties 
)) 

env.execute("Kafka 0.10 Example") 
} 

本規範的註釋顯示,我不知道怎麼樣正確選擇部分消息。我使用map,但我不知道如何連接整個消息。例如,我在代碼中所做的只能給我一個結果爲「1」,但我想要的是{「a」:1,「c」:「3」}

或者,不同的方式來解決這個問題。事情是在火花流媒體有一個「選擇」API,但我無法在Flink找到它。

非常感謝flink社區的幫助!這是我希望在這個小型項目中實現的最後一個功能。

回答

1

Flink流處理作業每處理一次輸入並將其輸出到下一個任務或將其保存到外部存儲器。

一種方法是將所有輸出保存到外部存儲中,如HDFS。流式作業完成後,使用批處理作業將它們組合成JSON。

另一種方法是使用state和RichMapFunction來獲取包含所有鍵值的JSON。

stream.map(new MapFunction<String, Tuple2<String, String>>() { 
    public Tuple2<String, String> map(String value) throws Exception { 
     return new Tuple2<String, String>("mock", value); 
    } 
}).keyBy(0).map(new RichMapFunction<Tuple2<String,String>, String>() { 
    @Override 
    public String map(Tuple2<String, String> value) throws Exception { 
     ValueState<String> old = getRuntimeContext().getState(new ValueStateDescriptor<String>("test", String.class)); 
     String newVal = old.value(); 
     if (newVal != null) makeJSON(newVal, value.f1); 
     else newVal = value.f1; 
     old.update(newVal); 
     return newVal; 
    } 
}).print(); 

並使用此映射函數:filteredStream.map(function);

請注意,使用狀態時,您將看到如下輸出: {「a」:1},{「a」:1,「c」:3}。 最後的輸出應該是你想要的。

+0

謝謝! makeJSON是Flink的一個內置函數嗎?或者你的意思是我需要自己寫一個函數並放在那裏? – teddy

+0

@teddy不,Flink不包含此類方法,它是一個用於說明的僞代碼。你可以實現一個。不需要很多代碼;) – David

+0

我得到一個錯誤,說鍵控狀態只能用在'鍵控流'上,也就是說,在這一行上的'keyBy()'操作之後(ValueState state = getRuntimeContext()。 getState(new ValueStateDescriptor (「json」,String.class));) – teddy