1
我試圖通過使用KafkaRDD從Kafka中讀取以獲取消息和元數據。 ,我使用的方法是如下:是否有可能在Spark + KafkaRDD中獲得特定的消息偏移
val messageHandler = (mmd: MessageAndMetadata[String,Array[Byte]]) => {
(mmd.message(), mmd.offset)
}
val messagesAndMetadata = KafkaUtils.createRDD[String, Array[Byte], StringDecoder, DefaultDecoder, Tuple2[String,Long]]
(sc.asInstanceOf[SparkContext], KafkaParams, offsetRangeTrete, leaderForPartitionsMap, messageHandler)
但是編譯器顯示錯誤:
ambiguous reference to overloaded definition,
both method createRDD in object KafkaUtils of type
(jsc: org.apache.spark.api.java.JavaSparkContext, keyClass: Class[String], valueClass: Class[String], keyDecoderClass: Class[kafka.serializer.StringDecoder], valueDecoderClass: Class[kafka.serializer.StringDecoder], recordClass: Class[String], kafkaParams: java.util.Map[String,String], offsetRanges: Array[org.apache.spark.streaming.kafka.OffsetRange], leaders: java.util.Map[kafka.common.TopicAndPartition,org.apache.spark.streaming.kafka.Broker], messageHandler: org.apache.spark.api.java.function.Function[kafka.message.MessageAndMetadata[String,String],String])org.apache.spark.api.java.JavaRDD[String]
and method createRDD in object KafkaUtils of type (sc: org.apache.spark.SparkContext, kafkaParams: Map[String,String], offsetRanges: Array[org.apache.spark.streaming.kafka.OffsetRange], leaders: Map[kafka.common.TopicAndPartition,org.apache.spark.streaming.kafka.Broker], messageHandler: kafka.message.MessageAndMetadata[String,String] => String)(implicit evidence$9: scala.reflect.ClassTag[String], implicit evidence$10: scala.reflect.ClassTag[String], implicit evidence$11: scala.reflect.ClassTag[kafka.serializer.StringDecoder], implicit evidence$12: scala.reflect.ClassTag[kafka.serializer.StringDecoder], implicit evidence$13: scala.reflect.ClassTag[String])org.apache.spark.rdd.RDD[String]
match expected type ?
[ERROR] val treteMetadata = org.apache.spark.streaming.kafka.KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder, String]
我使用星火1.3.0。 有沒有人有關於如何解決這個問題的想法?
謝謝
你爲什麼不使用使用權類型爲參數leaderForPartitionsMap必須是地圖[TopicAndPartition,org.apache.spark.streaming.kafka.Broker]解決sc直接在那裏沒有asInstanceOf –
因爲似乎編譯器無法區分與JavaSparkContext一起使用的方法的簽名和使用SparkContext的方法的簽名。但是,如果沒有這種情況,錯誤是一樣的。 –
你可以粘貼完整堆棧的鏈接跟蹤從gist這將有所幫助! –