我的目標是使用kafka以json格式讀取字符串,對字符串進行過濾,選擇部分消息並下沉消息(仍以json字符串格式)。如何在Flink 1.2中從Kafka中提取部分json格式的字符串
出於測試目的,我的輸入字符串信息是這樣的:
{"a":1,"b":2,"c":"3"}
而且我實現的代碼是:
def main(args: Array[String]): Unit = {
val inputProperties = new Properties()
inputProperties.setProperty("bootstrap.servers", "localhost:9092")
inputProperties.setProperty("group.id", "myTest2")
val inputTopic = "test"
val outputProperties = new Properties()
outputProperties.setProperty("bootstrap.servers", "localhost:9092")
val outputTopic = "test2"
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.disableSysoutLogging
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
// create a checkpoint every 5 seconds
env.enableCheckpointing(5000)
// create a Kafka streaming source consumer for Kafka 0.10.x
val kafkaConsumer = new FlinkKafkaConsumer010(
inputTopic,
new JSONDeserializationSchema(),
inputProperties)
val messageStream : DataStream[ObjectNode]= env
.addSource(kafkaConsumer).rebalance
val filteredStream: DataStream[ObjectNode] = messageStream.filter(node => node.get("a")
.asText.equals("1") && node.get("b").asText.equals("2"))
// Need help in this part, how to extract for instance a,c and
// get something like {"a":"1", "c":"3"}?
val testStream:DataStream[JsonNode] = filteredStream.map(
node => {
node.get("a")
}
)
testStream.addSink(new FlinkKafkaProducer010[JsonNode](
outputTopic,
new SerializationSchema[JsonNode] {
override def serialize(element: JsonNode): Array[Byte] = element.toString.getBytes()
}, outputProperties
))
env.execute("Kafka 0.10 Example")
}
本規範的註釋顯示,我不知道怎麼樣正確選擇部分消息。我使用map,但我不知道如何連接整個消息。例如,我在代碼中所做的只能給我一個結果爲「1」,但我想要的是{「a」:1,「c」:「3」}
或者,不同的方式來解決這個問題。事情是在火花流媒體有一個「選擇」API,但我無法在Flink找到它。
非常感謝flink社區的幫助!這是我希望在這個小型項目中實現的最後一個功能。
謝謝! makeJSON是Flink的一個內置函數嗎?或者你的意思是我需要自己寫一個函數並放在那裏? – teddy
@teddy不,Flink不包含此類方法,它是一個用於說明的僞代碼。你可以實現一個。不需要很多代碼;) – David
我得到一個錯誤,說鍵控狀態只能用在'鍵控流'上,也就是說,在這一行上的'keyBy()'操作之後(ValueState state = getRuntimeContext()。 getState(new ValueStateDescriptor (「json」,String.class));) –
teddy