2017-08-22 59 views
1

我有一個看起來像下面java.lang.NoClassDefFoundError:在Scala代碼通過啓動火花作業時無法初始化類火花提交

 
object ErrorTest { 
case class APIResults(status:String, col_1:Long, col_2:Double, ...) 

def funcA(rows:ArrayBuffer[Row])(implicit defaultFormats:DefaultFormats):ArrayBuffer[APIResults] = { 
    //call some API ang get results and return APIResults 
    ... 
} 

// MARK: load properties 
val props = loadProperties() 
private def loadProperties(): Properties = { 
    val configFile = new File("config.properties") 
    val reader = new FileReader(configFile) 
    val props = new Properties() 
    props.load(reader) 
    props 
} 

def main(args: Array[String]): Unit = { 
    val prop_a = props.getProperty("prop_a") 

    val session = Context.initialSparkSession(); 
    import session.implicits._ 

    val initialSet = ArrayBuffer.empty[Row] 
    val addToSet = (s: ArrayBuffer[Row], v: Row) => (s += v) 
    val mergePartitionSets = (p1: ArrayBuffer[Row], p2: ArrayBuffer[Row]) => (p1 ++= p2) 

    val sql1 = 
    s""" 
     select * from tbl_a where ... 
    """ 

    session.sql(sql1) 
    .rdd.map{row => {implicit val formats = DefaultFormats; (row.getLong(6), row)}} 
    .aggregateByKey(initialSet)(addToSet,mergePartitionSets) 
    .repartition(40) 
    .map{case (rowNumber,rows) => {implicit val formats = DefaultFormats; funcA(rows)}} 
    .flatMap(x => x) 
    .toDF() 
    .write.mode(SaveMode.Overwrite).saveAsTable("tbl_b") 
    } 
} 

,當我通過​​運行它,它會拋出錯誤代碼導致:java.lang.NoClassDefFoundError:無法初始化類staging_jobs.ErrorTest $。但如果我將val props = loadProperties()轉換爲main方法的第一行,那麼就沒有錯誤了。有誰能給我一個關於這個現象的解釋嗎?非常感謝!

Caused by: java.lang.NoClassDefFoundError: Could not initialize class staging_jobs.ErrorTest$ 
    at staging_jobs.ErrorTest$$anonfun$main$1.apply(ErrorTest.scala:208) 
    at staging_jobs.ErrorTest$$anonfun$main$1.apply(ErrorTest.scala:208) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) 
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:243) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188) 
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193) 
    ... 8 more 

回答

0

我遇到了和你一樣的問題。我在main方法之外定義了一個方法convert。當我在main中使用dataframe.rdd.map{x => convert(x)}時,NoClassDefFoundError:Could not initialize class Test$發生了。

但是,當我使用功能對象convertor,這是與convert方法相同的代碼,在main方法中,沒有發生錯誤。

我使用了spark 2.1.0,scala 2.11,它好像是一個spark中的bug?

0

我想問題是val props = loadProperties()定義了一個外部類(main)的成員。然後這個成員將被序列化(或運行)在執行者上,這些執行者沒有驅動程序的保存環境。