0
我在Apache Flink的scala中運行一個簡單的腳本。 我從Apache Kafka製作人處讀取數據。這是我的代碼。卡夫卡/ Flink與地圖功能的集成問題
import java.util.Properties
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.json4s._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization
import org.json4s.native.Serialization.{read, write}
object App {
def main(args : Array[String]) {
case class Sensor2(sensor_name: String, start_date: String, end_date: String, data: String, stt: Int)
val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val consumer1 = new FlinkKafkaConsumer010[String]("topics1", new SimpleStringSchema(), properties)
val stream1 = env
.addSource(consumer1)
.flatMap(raw => JsonMethods.parse(raw).toOption)
env.execute()
}
}
我上flatMap一個「缺少的參數類型」錯誤(但同樣的錯誤我得到的,當我嘗試使用其他功能,如地圖或過濾器)。 我不知道要解決這個問題。 有什麼幫助嗎?
LF
謝謝。這解決了缺少的參數類型錯誤。 –
爲了將來自Kafka的DataStream [String]解析爲DataStream [Sensor2]類,我不得不對代碼進行一些修改。工作的代碼是: 'VAL consumer1 =新FlinkKafkaConsumer010 [字符串]( 「topics1」,新SimpleStringSchema(),屬性) VAL流1 = ENV .addSource(consumer1) VAL S1 = {stream1.map X = > { 隱式val格式= DefaultFormats JsonMethods.parse(x).extract [Sensor2] } } –