2016-10-20 129 views
1

我正在猜測Spark中的多類分類。 我有300.000個預定義的分類集。 轉換數據時沒有任何問題,但當我嘗試訓練模型時,我的內存不足。 我該如何解決這個問題?Spark隨機森林分類器在訓練時拋出java.lang.OutOfMemoryError

object Test { 

    var num = 50 
    var savePath = "c:/Temp/SparkModel/" 
    var stemmer = Resha.Instance 

    var STOP_WORDS: Set[String] = Set() 

    def cropSentence(s: String) = { 
    s.replaceAll("\\([^\\)]*\\)", "") 
     .replaceAll(" - ", " ") 
     .replaceAll("-", " ") 
     .replaceAll(" +", " ") 
     .replaceAll(",", " ").trim() 
    } 

    def main(args: Array[String]): Unit = { 

    val sc = new SparkConf().setAppName("Test").setMaster("local[*]") 
     .set("spark.sql.warehouse.dir", "D:/Temp/wh") 
     .set("spark.executor.memory", "12g") 
     .set("spark.driver.memory", "4g") 
     .set("spark.hadoop.validateOutputSpecs", "false") 

    val spark = SparkSession.builder.appName("Java Spark").config(sc).getOrCreate() 
    import spark.implicits._ 

    val mainDataset = spark.sparkContext.textFile("file:///C:/Temp/classifications.csv") 
     .map(_.split(";")) 
     .map(tokens => {  
     var list=new ListBuffer[String]() 
     var token0=cropSentence(tokens(0).toLowerCase(Locale.forLanguageTag("TR-tr")));  
     token0.split("\\s+").map {list+=stemmer.stem(_)} 
     (tokens(1), tokens(0),list.toList.mkString(" ")) 
     }).toDF("className","productNameOrg","productName") 


    val classIndexer = new StringIndexer() 
     .setInputCol("className") 
     .setOutputCol("label") 

    val classIndexerModel = classIndexer.fit(mainDataset) 
    var mainDS=classIndexerModel.transform(mainDataset) 
    classIndexerModel.write.overwrite.save(savePath + "ClassIndexer") 

    //Tokenizer 
       val tokenizer = new Tokenizer()         
          .setInputCol("productName")      
          .setOutputCol("words_nonfiltered") 
    //StopWords 
       val remover = new StopWordsRemover() 
          .setInputCol("words_nonfiltered") 
          .setOutputCol("words") 
          .setStopWords(Array[String]("stop1","stop2","stop3")) 
    //CountVectorize 

       val countVectorizer = new CountVectorizer() 
          .setInputCol("words") 
          .setOutputCol("features") 

       val rfc = new RandomForestClassifier()       
         .setLabelCol("label") 
         .setNumTrees(50) 
         .setMaxDepth(15) 
         .setFeatureSubsetStrategy("auto") 
         .setFeaturesCol("features") 
         .setImpurity("gini") 
         .setMaxBins(32) 


      val pipeline = new Pipeline().setStages(Array(tokenizer,remover,countVectorizer,rfc)) 
      val train =mainDS 
      val model = pipeline.fit(train) <============= OOM 
      model.write.overwrite.save(savePath+"RandomForestClassifier") 

    } 
} 

錯誤:

16/10/21 00:54:23 INFO ExternalAppendOnlyMap: Thread 101 spilling in-memory map of 2.9 GB to disk (1 time so far) 
16/10/21 00:56:58 INFO ExternalAppendOnlyMap: Thread 98 spilling in-memory map of 2.7 GB to disk (2 times so far) 
16/10/21 00:57:05 INFO ExternalAppendOnlyMap: Thread 101 spilling in-memory map of 2.7 GB to disk (2 times so far) 
Exception in thread "shuffle-server-0" java.lang.OutOfMemoryError: Java heap space 
16/10/21 01:02:37 WARN SingleThreadEventExecutor: Unexpected exception from an event executor: 
java.lang.OutOfMemoryError: Java heap space 
16/10/21 01:02:43 WARN TaskMemoryManager: leak 1575.8 MB memory from [email protected] 
16/10/21 01:02:42 ERROR Utils: uncaught error in thread Spark Context Cleaner, stopping SparkContext 
java.lang.OutOfMemoryError: Java heap space 
    at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:176) 
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1249) 
    at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:172) 
    at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:67) 
16/10/21 01:02:37 WARN NioEventLoop: Unexpected exception in the selector loop. 
java.lang.OutOfMemoryError: Java heap space 
16/10/21 01:02:44 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 126580 ms exceeds timeout 120000 ms 
16/10/21 01:03:56 ERROR TaskSchedulerImpl: Lost executor driver on localhost: Executor heartbeat timed out after 126580 ms 
16/10/21 01:03:58 ERROR Executor: Exception in task 0.0 in stage 12.0 (TID 25) 
java.lang.OutOfMemoryError: Java heap space 
Exception in thread "Spark Context Cleaner" java.lang.OutOfMemoryError: Java heap space 
Exception in thread "Executor task launch worker-4" java.lang.OutOfMemoryError: Java heap space 
16/10/21 01:06:00 WARN TaskSetManager: Lost task 1.0 in stage 12.0 (TID 26, localhost): ExecutorLostFailure (executor driver exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 126580 ms 
16/10/21 01:06:00 ERROR TaskSetManager: Task 1 in stage 12.0 failed 1 times; aborting job 
+0

你在集羣模式或客戶端模式下運行呢? –

+0

以8核心的客戶端模式運行。 – kkurt

回答

1

它通常在驅動程序存儲沒有微調出現。

你在做什麼錯在這裏是你傳遞4g作爲驅動程序的內存,並從Spark Conf中設置它,但正如文檔所述,它不會在客戶端模式下工作,因此你必須通過這個顯式同時提交申請。

看看這裏的配置:https://spark.apache.org/docs/1.6.1/configuration.html#available-properties

+0

我把這個參數傳給spark-submit,並在線路上得到相同的錯誤:val model = pipeline.fit(train); <== OOM – kkurt

相關問題