2017-08-01 64 views
0

我在Apache Flink的scala中運行一個簡單的腳本。 我從Apache Kafka製作人處讀取數據。這是我的代碼。卡夫卡/ Flink與地圖功能的集成問題

import java.util.Properties 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment 
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010 
import org.apache.flink.streaming.util.serialization.SimpleStringSchema 
import org.json4s._ 
import org.json4s.native.JsonMethods._ 
import org.json4s.native.Serialization 
import org.json4s.native.Serialization.{read, write} 

object App { 

    def main(args : Array[String]) { 

case class Sensor2(sensor_name: String, start_date: String, end_date: String, data: String, stt: Int) 

val properties = new Properties(); 
    properties.setProperty("bootstrap.servers", "localhost:9092"); 
    properties.setProperty("group.id", "test"); 

    val env = StreamExecutionEnvironment.getExecutionEnvironment() 
    val consumer1 = new FlinkKafkaConsumer010[String]("topics1", new SimpleStringSchema(), properties) 
    val stream1 = env 
    .addSource(consumer1) 
    .flatMap(raw => JsonMethods.parse(raw).toOption) 

    env.execute() 

} 

} 

我上flatMap一個「缺少的參數類型」錯誤(但同樣的錯誤我得到的,當我嘗試使用其他功能,如地圖或過濾器)。 我不知道要解決這個問題。 有什麼幫助嗎?

LF

回答

0

您應該使用的StreamExecutionEnvironment階API版本。

改變你的進口:

import java.util.Properties 
import org.apache.flink.streaming.api.scala._ 
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010 
import org.apache.flink.streaming.util.serialization.SimpleStringSchema 
import org.json4s._ 
import org.json4s.native.JsonMethods 
+0

謝謝。這解決了缺少的參數類型錯誤。 –

+0

爲了將來自Kafka的DataStream [String]解析爲DataStream [Sensor2]類,我不得不對代碼進行一些修改。工作的代碼是: 'VAL consumer1 =新FlinkKafkaConsumer010 [字符串]( 「topics1」,新SimpleStringSchema(),屬性) VAL流1 = ENV .addSource(consumer1) VAL S1 = {stream1.map X = > { 隱式val格式= DefaultFormats JsonMethods.parse(x).extract [Sensor2] } } –