2016-03-07 55 views
1

我在使用YARN的Hadoop集羣上運行以下代碼。 它分析一些電子郵件並執行情感註釋,最後它將結果DataFrame寫爲HDFS上的Parquet表。不幸的是,它在HDFS上的最後一個數據幀寫入行#66上保持失敗,錯誤顯示在底部。無論如何,我無法解釋爲什麼每當我使用數據集的一個小樣本時,它終止成功。DataFrame寫入HDFS期間SparkException

object ETLDriver { 

    val appName = "ENRON-etl" 
    val conf = new SparkConf().setAppName(appName) 
    val sc = new SparkContext(conf) 

    def main (args: Array[String]): Unit = { 
     val allExtracted = sc.objectFile[(String, Seq[String])](Commons.ENRON_EXTRACTED_TXT) 
     // Testing on a sub-sample 
     // val allExtracted = sc.objectFile[(String, Seq[String])](Commons.ENRON_EXTRACTED_TXT).sample(false, 0.01, 42) 

     // get custodians from csv file stored in HDFS 
     val csv = sc.textFile(Commons.ENRON_CUSTODIANS_CSV_HDFS).map{line => line.split(",")} 
     var custodians = sc.broadcast(csv.map{record => Custodian(record(0),record(1),Option(record(2)))}.collect().toSeq) 

     // parse emails 
     val allParsed: RDD[MailBox] = allExtracted.map { case (mailbox, emails) => 
      val parsedEmails = emails flatMap { email => 
       try Some(EmailParser.parse(email, custodians.value)) 
       catch { 
        case e: EmailParsingException => None 
       } 
      } 
      MailBox(mailbox, parsedEmails) 
     } 

     // classify sentiment and save w/o body 
     val mailboxesSentiment = allParsed.map { mailbox => 
      // load sentiment annotator pipeline 
      val nlpProps = new Properties 
      nlpProps.setProperty("annotators", "tokenize, ssplit, pos, parse, lemma, sentiment") 
      val pipeline = new StanfordCoreNLP(nlpProps) 
      // annotation 
      val emailsWithSentiment = mailbox.emails.map { email => 
       val document = new Annotation(email.body) 
       pipeline.annotate(document) 
       val sentiment = document.get[String](classOf[SentimentCoreAnnotations.ClassName]) 
       EmailWithSentiment(email.date, email.from, email.to ++ email.cC++ email.bcc, email.subject, sentiment) 
      } 

      MailBoxWithSentiment(mailbox.name, emailsWithSentiment) 
     } 

     val sqlContext = new SQLContext(sc) 
     import sqlContext.implicits._ 

     val dfFull = allParsed.toDF() 
     dfFull.write.mode(SaveMode.Overwrite).parquet(Commons.ENRON_FULL_DATAFRAME) 
     val dfSentiment = mailboxesSentiment.toDF() 
     dfSentiment.write.mode(SaveMode.Overwrite).parquet(Commons.ENRON_SENTIMENT_DATAFRAME) 
    } 
} 

錯誤:

AM Container for appattempt_1456482988572_5307_000001 exited with exitCode: 15 
For more detailed output, check application tracking page:http://head05.custer_url:8088/cluster/app/application_1456482988572_5307Then, click on links to logs of each attempt. 
Diagnostics: Exception from container-launch. 
Container id: container_1456482988572_5307_01_000001 
Exit code: 15 
Stack trace: ExitCodeException exitCode=15: 
at org.apache.hadoop.util.Shell.runCommand(Shell.java:576) 
at org.apache.hadoop.util.Shell.run(Shell.java:487) 
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:753) 
at org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:371) 
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302) 
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82) 
at java.util.concurrent.FutureTask.run(FutureTask.java:262) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:745) 
Shell output: main : command provided 1 
main : run as user is lsde03 
main : requested yarn user is lsde03 
Container exited with a non-zero exit code 15 
Failing this attempt 

再登錄星火here

+0

你能注意到這行是第66,因爲StackOverflow上沒有行號。這個? 'dfSentiment.write.mode(SaveMode.Overwrite).parquet(Commons.ENRON_SENTIMENT_DATAFRAME)' – morganw09dev

+0

yes就是那一行 –

回答

0

行號往往是非常無益的。他們通常會指向觸發工作的線路,而不是問題,所以通常是一些savecollect操作。

在您鏈接的日誌中是ExecutorLostFailure。目前尚不清楚爲什麼我發現,通常當Spark沒有給出理由時,內存是一個很好的開始。您廣播的數據有多大?你可能想檢查你的記憶設置。對於YARN上的Spark,這是一篇很好理解的文章。

http://www.wdong.org/wordpress/blog/2015/01/08/spark-on-yarn-where-have-all-my-memory-gone/

希望這有助於

+0

根據HDFS,加載到''''allExtracted'''的文件總大小是3.3G。感謝您的鏈接,不幸的是我沒有訪問Spark/Hadoop配置,我想我可能只需用'''spark-submit''發送一些參數。我嘗試着用大量資源來執行,但仍然失敗('''spark-submit --master yarn --deploy-mode cluster --num-executors 480 --executor-memory 12G --executor-cores 4 - 驅動核4 - 驅動內存6G [......]''')。無論如何,如果我評論情緒提取(標有'''<<<''''[這裏](http://pastebin.com/9zsLgPpF)的行) –