2016-01-15 42 views
15

我想創建一個新的 mongodb RDD,每當我進入foreachRDD時。不過我有序列化問題:Spark Streaming:foreachRDD更新我的mongo RDD

mydstream 
    .foreachRDD(rdd => { 
     val mongoClient = MongoClient("localhost", 27017) 
     val db = mongoClient(mongoDatabase) 
     val coll = db(mongoCollection) 
     // ssc is my StreamingContext 
     val modelsRDDRaw = ssc.sparkContext.parallelize(coll.find().toList) }) 

這會給我一個錯誤:

object not serializable (class: org.apache.spark.streaming.StreamingContext, value: [email protected]) 

任何想法?

+0

'SparkContext'不是可序列化的,因此您不能在任何轉換或操作方法中使用,您只能在驅動程序類中使用。 – Shankar

+0

是否有任何具體的原因,你爲什麼要將列表轉換爲rdd裏面的foreachRDD方法? – Shankar

回答

7

你可以嘗試使用rdd.context返回無論是SparkContext或SparkStreamingContext(如果RDD是DSTREAM)。

mydstream foreachRDD { rdd => { 
     val mongoClient = MongoClient("localhost", 27017) 
     val db = mongoClient(mongoDatabase) 
     val coll = db(mongoCollection) 
     val modelsRDDRaw = rdd.context.parallelize(coll.find().toList) }) 

實際上,似乎RDD也有一個.sparkContext方法。我真的不知道差別,也許他們是別名(?)。

2

根據我的理解,如果你有一個「不可序列化」的對象,你需要將它傳遞給foreachPartition,這樣你可以在運行你的處理之前連接到每個節點上的數據庫。

mydstream.foreachRDD(rdd => { 
     rdd.foreachPartition{ 
      val mongoClient = MongoClient("localhost", 27017) 
      val db = mongoClient(mongoDatabase) 
      val coll = db(mongoCollection) 
      // ssc is my StreamingContext 
      val modelsRDDRaw = ssc.sparkContext.parallelize(coll.find().toList) }}) 
+0

這將無法正常工作,因爲ssc不可序列化。 –

+0

您可以嘗試在rdd.foreachPartition'val ssc = StreamingContext.getOrCreate(checkpointdirectory,functionToCreateContext _)之前在foreachRDD中創建您的ssc' – Rami