我使用Spark Streaming從Kafka主題列表中讀取。 我正在關注此link的官方API。我正在使用的方法是:Spark Streaming + Kafka:如何從kafka消息檢查主題名稱
val kafkaParams = Map("metadata.broker.list" -> configuration.getKafkaBrokersList(), "auto.offset.reset" -> "largest")
val topics = Set(configuration.getKafkaInputTopic())
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
我想知道執行器如何從主題列表中讀取消息?他們的政策是什麼?他們會閱讀一個主題,然後當他們完成消息傳遞給其他主題?
最重要的是,我怎麼能在調用這個方法後,檢查RDD中的消息的主題是什麼?
stream.foreachRDD(rdd => rdd.map(t => {
val key = t._1
val json = t._2
val topic = ???
})
您可以使用地圖像VAR記錄= stream.map(記錄=>(record.topic)) –
@ israel.zinc我認爲'stream'中的元素是'Tuple2 [String,String]'。有沒有參數或方法稱爲主題 – salvob