2016-08-05 136 views
0

我有一些非典型問題。當我嘗試處理從kafka接收到的rdd時,我嘗試訪問sparkContext時得到異常(java.lang.NullPointerException)。 RDDProcessor是序列化Spark Streaming丟失SparkContext

def convertArrayToDataFrame(byteArray: Array[Byte], sqlContext: SQLContext) = { 
val stringFromByte = b2s(byteArray) 
val rdd = sqlContext.sparkContext.parallelize(stringFromByte.split("\n")) 
val rows = rdd.map(_.split(";")).map(attributes => Row.fromSeq(attributes.toSeq)) 
val dateframe = sqlContext.createDataFrame(rows,RDDComponents.schema) 
dateframe 
} 

問題開始這樣的:

val receiver = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_AND_DISK_SER) 
val sqlContext = new SQLContext(ssc.sparkContext) 
receiver.foreachRDD { rdd => 
    log.info("Received RDD attempt") 
    if (!rdd.isEmpty()) { 
     rdd.foreach(a => rddProcessor.processingRDD(a, sqlContext)) 
    } 

然而,當我只處理第一RDD,不會出現問題

val receiver = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_AND_DISK_SER) 
val sqlContext = new SQLContext(ssc.sparkContext) 
receiver.foreachRDD { rdd => 
    log.info("Received RDD attempt") 
    if (!rdd.isEmpty()) { 
    rddProcessor.processingRDD(rdd.first(), sqlContext) 
    } 

我真的不知道爲什麼它如此有問題的。如果有人有技巧我會很感激

@EDIT 那我定義的StreamingContext

val sparkConf = new SparkConf().setAppName("KafkaConsumer") 
val ssc = new StreamingContext(sparkConf, Milliseconds(batchDuration)) 
+0

你能否提供代碼,在哪裏定義你的ssc:SparkContext? – ponkin

+0

好的,我添加了這個帖子 – kasiula03

回答

0

嘛,SparkContext不可序列化,並通過SparkSession,在那裏它被標記爲@transientSqlContext是可用的。所以如果你不能寫processingRDD以至於它從不使用SparkContext,你不能在需要序列化的lambda中使用它,例如foreachmap的參數(但不是foreachRDD的! )。

+0

我明白了,但爲什麼它爲一個rdd工作?當對象序列化時,所有的函數都會被序列化? – kasiula03

+0

'foreachRDD'不會將其參數發送給其他節點,它完全在驅動程序節點上運行(而'rdd.first()'將該節點從其他節點發送到驅動程序節點)。 'rdd.foreach'確實需要發送它的參數給每個節點在那裏執行它。 –

+0

我怎麼能跳過這個問題當我必須用sparkContext處理這個rdd? rdd.colect()的作品,但它不是正確的方式 – kasiula03