2016-10-06 85 views
0

我想在我的測試框架中設置Spark-MongoDB連接器。我的StreamingContext設置是這樣的:Spark Mongodb連接器單元測試

val conf = new SparkConf() 
      .setMaster("local[*]") 
      .setAppName("test") 
      .set("spark.mongodb.input.uri", "mongodb://localhost:27017/testdb.testread") 
      .set("spark.mongodb.output.uri", "mongodb://localhost:27017/testdb.testwrite") 

lazy val ssc = new StreamingContext(conf, Seconds(1))

每當我試圖建立這樣一個DSTREAM:

val records = new ConstantInputDStream(ssc, ssc.sparkContext.makeRDD(seq))

我被打到這個錯誤

java.lang.IllegalStateException:無法調用已停止的SparkC上的方法ontext。

看起來上下文正在啓動,然後立即停止,但我找不出原因。該日誌不會給出任何錯誤。這是它完成啓動,然後立即停止:

調試] 2016年10月6日18:29:51625 org.spark_project.jetty.util.component.AbstractLifeCycle setStarted - STARTED @ 4858ms osjsServletContextHandler @ 33b85bc {/metrics/json,null,AVAILABLE} [WARN] 2016-10-06 18:29:51,660 org.apache.spark.streaming.StreamingContext logWarning - StreamingContext尚未啓動 [DEBUG] 2016-10-06 18 :29:51,662 org.spark_project.jetty.util.component.AbstractLifeCycle setStopping - stops [email protected] [DEBUG] 2016-10-06 18:29:51,664 org.spark_project.jetty.server .Server doStop - 正常關機[email protected]作者:

當我刪除MongoDB的連接設置不關閉,一切都很好(除了我不能讀/寫MONGO :()

編輯: 這是測試,我試着寫信給mongo。但是,我的測試套件甚至在我到達這一點之前就失敗了。

"read from kafka queue" in new SparkScope{ 

    val stream = KafkaUtils.createDirectStream[String, String](
    ssc, 
    PreferConsistent, 
    Subscribe[String, String](List("topic"), 
     Map[String, Object](
     "bootstrap.servers"->s"localhost:${kServer.kafkaPort}", 
     "key.deserializer" -> classOf[StringDeserializer], 
     "value.deserializer" -> classOf[StringDeserializer], 
     "group.id" -> "testing", 
     "auto.offset.reset" -> "latest", 
     "enable.auto.commit" -> (false: java.lang.Boolean) 
    ) 
    ) 
) 
    val writeConfig = WriteConfig(Map(
    "collection"->"testcollection", 
    "writeConcern.w"->"majority", 
    "db"->"testdb" 
), Some(WriteConfig(ssc.sparkContext))) 

    stream.map(r => (r.key.toLong, r.value.toLong)) 
    .reduceByKey(_+_) 
    .map{case (k,v) => { 
     val d = new Document() 
     d.put("key", k) 
     d.put("value", v) 
     d 
    }} 
    .foreachRDD(rdd => rdd.saveToMongoDB(writeConfig)) 

    ssc.start 
    (1 until 10).foreach(x => producer.send(KafkaProducerRecord("topic", "1", "1"))) 
    ssc.awaitTerminationOrTimeout(1500) 
    ok 
} 

當我嘗試創建一個階集流在這裏發生該故障:

"return a single record with the correct sum" in new SparkScope{ 
    val stream = new ConstantInputDStream(ssc, ssc.sparkContext.makeRDD(seq)) 
    val m = HashMap.empty[Long,Long] 
    FlattenTimeSeries.flatten(stream).foreachRDD(rdd => m ++= rdd.collect()) 
    ssc.start() 
    ssc.awaitTerminationOrTimeout(1500) 
    m.size === 1 and m(1) === 20 
    } 

的SparkScope類只是創建了我上面顯示的StreamingContext和測試後調用ssc.stop()

+0

很奇怪 - 在這個例子中,你根本沒有對Mongo做任何事情 - 你能擴展它嗎? – Ross

回答

1

明白了。問題是SparkConf變量未聲明lazy,但StreamingContext是。我不知道爲什麼這很重要,但確實如此。固定。

+0

嗨用戶1748268,我一直在試圖保存數據到MongoDB,但仍然不成功。您可以請分享您的簡化形式的正在運行的項目,以便我可以看看並繼續。在此先感謝,乾杯:) –

+0

嗨@DynamicRemo。我上面發佈的代碼基本上是完整的,實際上完美的工作。我碰到的問題是specs2相關的(我的範圍是一個抽象類而不是特質)。你有什麼具體問題?也許我可以幫忙 – Tim