2014-10-02 34 views
0

我想用FunSuite通過用新的功能擴展FunSuite測試我Spark工作,叫localTest,運行帶有默認SparkContext測試:使用FunSuite測試星火引發NullPointerException異常

class SparkFunSuite extends FunSuite { 

    def localTest(name : String)(f : SparkContext => Unit) : Unit = { 
    val conf = new SparkConf().setAppName(name).setMaster("local") 
    val sc = new SparkContext(conf) 
    try { 
     this.test(name)(f(sc)) 
    } finally { 
     sc.stop 
    } 
    } 
} 

然後我可以輕鬆地添加測試,我的測試套件:

class MyTestSuite extends SparkFunSuite { 

    localTest("My Spark test") { sc => 
    assertResult(2)(sc.parallelize(Seq(1,2,3)).filter(_ <= 2).map(_ + 1).count) 
    } 
} 

的問題是,當我運行測試中,我得到了NullPointerException

[info] MyTestSuite: 
[info] - My Spark test *** FAILED *** 
[info] java.lang.NullPointerException: 
[info] at org.apache.spark.SparkContext.defaultParallelism(SparkContext.scala:1215) 
[info] at org.apache.spark.SparkContext.parallelize$default$2(SparkContext.scala:435) 
[info] at MyTestSuite$$anonfun$1.apply(FunSuiteTest.scala:24) 
[info] at MyTestSuite$$anonfun$1.apply(FunSuiteTest.scala:23) 
[info] at SparkFunSuite$$anonfun$localTest$1.apply$mcV$sp(FunSuiteTest.scala:13) 
[info] at SparkFunSuite$$anonfun$localTest$1.apply(FunSuiteTest.scala:13) 
[info] at SparkFunSuite$$anonfun$localTest$1.apply(FunSuiteTest.scala:13) 
[info] at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) 
[info] at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) 
[info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) 
[info] ... 

什麼是造成NullPointerException?我在這方面使用Spark的方式不正確嗎?

我正在使用Scala 2.10.4和spark-core 1.0.2和scalatest 2.2.2。

+0

如果你調用'.set(「spark.default.parallelism」,「n」)',其中'n'是核心數量,在'setMaster'之後,NPE是否會消失? – huitseeker 2014-10-03 14:01:55

+0

@huitseeker同樣的錯誤。我不知道到底是什麼零... ... – mariop 2014-10-03 14:46:16

+0

Spark自己的單元測試使用了類似的模式(我們使用ScalaTest的BeforeAndAfterAll和BeforeAndAfterEach特性來執行此設置),所以我很驚訝這不起作用。您是否並行運行多個測試(例如,您是否曾經在同一個JVM中同時運行SparkContexts)? – 2014-10-03 15:18:58

回答

1

這個不起作用的原因是我濫用FunSuite.test。該方法在調用時會註冊一個新測試,即構建FunSuite時。測試運行時將會調用測試。但我的localTest在撥打FunSuite.test之前和之後會執行一些操作。尤其是,在登錄this.test(name)(f(sc))後,它停止SparkContext。當測試被調用時,sc停止,並且導致的taskScheduler字段SparkContxt。使用FunSuite正確的方法是:

import org.scalatest.FunSuite 

import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 


class SparkFunSuite extends FunSuite { 

    def localTest(name : String)(f : SparkContext => Unit) : Unit = { 
    this.test(name) { 
     val conf = new SparkConf() 
     .setAppName(name) 
     .setMaster("local") 
     .set("spark.default.parallelism", "1") 
     val sc = new SparkContext(conf) 
     try { 
     f(sc) 
     } finally { 
     sc.stop() 
     } 
    } 
    } 

} 

class MyTestSuite extends SparkFunSuite { 

    localTest("My Spark test") { sc => 
    assertResult(2)(sc.parallelize(Seq(1,2,3)).filter(_ <= 2).map(_ + 1).count) 
    } 
} 
4

如果您在多個類運行SparkContexts,請確保你把parallelExecution in Test := false在build.sbt。我在運行命令時遇到了問題:sbt test。我會得到一個NPEPARSING_ERROR由在JVM中運行的多個SparkContexts引起。

+0

真棒 - 我一直在尋找這個。 – 2016-02-08 13:03:22

+0

這需要在Spark測試文獻中突出的地方 – Max 2016-08-23 17:40:12

相關問題