我有一些非典型問題。當我嘗試處理從kafka接收到的rdd時,我嘗試訪問sparkContext時得到異常(java.lang.NullPointerException)。 RDDProcessor是序列化Spark Streaming丟失SparkContext
def convertArrayToDataFrame(byteArray: Array[Byte], sqlContext: SQLContext) = {
val stringFromByte = b2s(byteArray)
val rdd = sqlContext.sparkContext.parallelize(stringFromByte.split("\n"))
val rows = rdd.map(_.split(";")).map(attributes => Row.fromSeq(attributes.toSeq))
val dateframe = sqlContext.createDataFrame(rows,RDDComponents.schema)
dateframe
}
問題開始這樣的:
val receiver = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_AND_DISK_SER)
val sqlContext = new SQLContext(ssc.sparkContext)
receiver.foreachRDD { rdd =>
log.info("Received RDD attempt")
if (!rdd.isEmpty()) {
rdd.foreach(a => rddProcessor.processingRDD(a, sqlContext))
}
然而,當我只處理第一RDD,不會出現問題
val receiver = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_AND_DISK_SER)
val sqlContext = new SQLContext(ssc.sparkContext)
receiver.foreachRDD { rdd =>
log.info("Received RDD attempt")
if (!rdd.isEmpty()) {
rddProcessor.processingRDD(rdd.first(), sqlContext)
}
我真的不知道爲什麼它如此有問題的。如果有人有技巧我會很感激
@EDIT 那我定義的StreamingContext
val sparkConf = new SparkConf().setAppName("KafkaConsumer")
val ssc = new StreamingContext(sparkConf, Milliseconds(batchDuration))
你能否提供代碼,在哪裏定義你的ssc:SparkContext? – ponkin
好的,我添加了這個帖子 – kasiula03