1

我已經通過組合多個數組構造了一個DataFrame。我試圖將此保存到一個配置單元表中,我得到ArrayIndexOutofBound異常。以下是代碼和我得到的錯誤。我試圖在主def之外添加case class,但仍然得到相同的錯誤。在將DataFrame保存到Hive時出現Spark Scala錯誤

import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.sql.{Row, SQLContext, DataFrame} 
import org.apache.spark.ml.feature.RFormula 
import java.text._ 
import java.util.Date 
import org.apache.hadoop.conf.Configuration 
import org.apache.hadoop.fs._ 
import org.apache.spark.ml.regression.LinearRegressionModel 
import org.apache.spark.ml.classification.LogisticRegressionModel 
import org.apache.spark.ml.classification.DecisionTreeClassificationModel 
import org.apache.spark.ml.classification.RandomForestClassificationModel 
import org.apache.spark.ml.PipelineModel 
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator 
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator 
import org.apache.spark.mllib.evaluation.MulticlassMetrics 
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics 
import org.apache.spark.sql.hive.HiveContext 
//case class Rows(col1: String, col2: String, col3: String, col4: String, col5: String, col6: String) 
object MLRCreate{ 
//  case class Row(col1: String, col2: String, col3: String, col4: String, col5: String, col6: String) 

    def main(args: Array[String]) { 
      val conf = new SparkConf().setAppName("MLRCreate") 
      val sc = new SparkContext(conf) 
      val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) 
      import hiveContext.implicits._ 
     import hiveContext.sql 

      val ReqId = new java.text.SimpleDateFormat("yyyyMMddHHmmss").format(new java.util.Date()) 
      val dirName = "/user/ec2-user/SavedModels/"+ReqId 
      val df = Functions.loadData(hiveContext,args(0),args(1)) 
      val form = args(1).toLowerCase 
      val lbl = form.split("~") 

      var lrModel:LinearRegressionModel = null; 
      val Array(training, test) = df.randomSplit(Array(args(3).toDouble, (1-args(3).toDouble)), seed = args(4).toInt) 
      lrModel = Functions.mlr(training) 

      var columnnames = Functions.resultColumns(df).substring(1) 
          var columnsFinal = columnnames.split(",") 
          columnsFinal = "intercept" +: columnsFinal 
          var coeff = lrModel.coefficients.toArray.map(_.toString) 
          coeff = lrModel.intercept.toString +: coeff 
          var stdErr = lrModel.summary.coefficientStandardErrors.map(_.toString) 
          var tval = lrModel.summary.tValues.map(_.toString) 
          var pval = lrModel.summary.pValues.map(_.toString) 

          var Signif:Array[String] = new Array[String](pval.length) 

          for(j <- 0 to pval.length-1){ 
           var sign = pval(j).toDouble; 
           sign = Math.abs(sign); 
           if(sign <= 0.001){ 
            Signif(j) = "***"; 
           }else if(sign <= 0.01){ 
            Signif(j) = "**"; 
           }else if(sign <= 0.05){ 
            Signif(j) = "*"; 
           }else if(sign <= 0.1){ 
            Signif(j) = "."; 
           }else{Signif(j) = " "; 
           } 
            println(columnsFinal(j)+"#########"+coeff(j)+"#########"+stdErr(j)+"#########"+tval(j)+"#########"+pval(j)+"########"+Signif) 
          } 
          case class Row(col1: String, col2: String, col3: String, col4: String, col5: String, col6: String) 

      //  print(columnsFinali.mkString("#"),coeff.mkString("#"),stdErr.mkString("#"),tval.mkString("#"),pval.mkString("#")) 


    val sums = Array(columnsFinal, coeff, stdErr, tval, pval, Signif).transpose 
          val rdd = sc.parallelize(sums).map(ys => Row(ys(0), ys(1), ys(2), ys(3),ys(4),ys(5))) 
          // val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) 
     // import hiveContext.implicits._ 
     // import hiveContext.sql 

          val result = rdd.toDF("Name","Coefficients","Std_Error","tValue","pValue","Significance") 
          result.show() 
          result.saveAsTable("iaw_model_summary.IAW_"+ReqId) 
      print(ReqId) 
      lrModel.save(dirName) 

    } 
} 

而下面是錯誤我得到的,

16/05/12 07:17:25 ERROR Executor: Exception in task 2.0 in stage 23.0 (TID 839) 
java.lang.ArrayIndexOutOfBoundsException: 1 
     at IAWMLRCreate$$anonfun$5.apply(IAWMLRCreate.scala:96) 
     at IAWMLRCreate$$anonfun$5.apply(IAWMLRCreate.scala:96) 
     at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
     at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
     at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
     at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
     at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
     at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) 
     at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
     at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
     at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
     at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
     at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
     at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
     at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
     at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
     at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
     at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
     at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) 
     at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) 
     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 
     at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
     at org.apache.spark.scheduler.Task.run(Task.scala:89) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 

回答

1

建議您檢查您所調換數組的長度:columnsFinal, coeff, stdErr, tval, pval, Signif。如果其中任何一個比其他的更短/更長,那麼轉置後的一些行將不完整。

val a1 = Array(1,2,3) 

val a2 = Array(5,6) 

Array(a1, a2).transpose.foreach(x => println(x.toList)) 

打印:換位時Scala沒有填補空值或任何你

List(1, 5) 
List(2, 6) 
List(3) 
+0

由於@mattinbits,其我的錯誤,在columnsFinal包含的功能和標籤欄,我需要擺脫的那個... :) – Sam