0

enter image description here嘗試在我的臨時表上選擇時收到錯誤消息。有人可以幫助我嗎?在齊柏林飛船上找不到溫度

object StreamingLinReg extends java.lang.Object{ 

    val conf = new SparkConf(true) 
      .set("spark.cassandra.connection.host", "127.0.0.1").setAppName("Streaming Liniar Regression") 
      .set("spark.cassandra.connection.port", "9042") 
      .set("spark.driver.allowMultipleContexts", "true") 
      .set("spark.streaming.receiver.writeAheadLog.enable", "true") 

     val sc = new SparkContext(conf); 
     val ssc = new StreamingContext(sc, Seconds(1)); 

     val sqlContext = new org.apache.spark.sql.SQLContext(sc); 

     import sqlContext.implicits._ 


      val trainingData = ssc.cassandraTable[String]("features","consumodata").select("consumo", "consumo_mensal", "soma_pf", "tempo_gasto").map(LabeledPoint.parse) 

      trainingData.toDF.registerTempTable("training") 
      val dstream = new ConstantInputDStream(ssc, trainingData) 

      val numFeatures = 100 
      val model = new StreamingLinearRegressionWithSGD() 
        .setInitialWeights(Vectors.zeros(numFeatures)) 
        .setNumIterations(1) 
        .setStepSize(0.1) 
        .setMiniBatchFraction(1.0) 

model.trainOn(dstream) 
        model.predictOnValues(dstream.map(lp => (lp.label, lp.features))).foreachRDD { rdd => 
        val metrics = new RegressionMetrics(rdd) 
        val MSE = metrics.meanSquaredError //Squared error 
        val RMSE = metrics.rootMeanSquaredError //Squared error 
        val MAE = metrics.meanAbsoluteError //Mean absolute error 
        val Rsquared = metrics.r2 
        //val Explained variance = metrics.explainedVariance 
        rdd.toDF.registerTempTable("liniarRegressionModel") 
        } 

    } 
     ssc.start() 
     ssc.awaitTermination() 
    //} 
} 


%sql 
select * from liniarRegressionModel limit 10 

當我做選擇臨時表時我得到一個錯誤消息。我執行select temp temp表後執行第一段。

org.apache.spark.sql.AnalysisException: Table not found:  liniarRegressionModel; line 1 pos 14 at  org.apache.spark.sql.catalyst.analysis. 
package$AnalysisErrorAt.failAnalysis (package.scala:42) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations 
$.getTable (Analyzer.scala:305) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations 
$$anonfun$apply$9.applyOrElse 
(Analyzer.scala:314) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations 
    $$anonfun$apply$9.applyOrElse(Analyzer.scala:309) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan 
    $$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan 
    $$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57) at org.apache.spark.sql.catalyst.trees.CurrentOrigin 
    $.withOrigin(TreeNode.scala:69) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators 
    (LogicalPlan.scala:56) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan 
    $$anonfun$1.apply(LogicalPlan.scala:54) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply 
    (LogicalPlan.scala:54) at  org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply 
    (TreeNode.scala:281) at scala.collection.Iterator 
    $$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$ 
    class.foreach(Iterator.scala:727) at  scala.collection.AbstractIterator.foreach 
    (Iterator.scala:1157) at scala.collection.generic.Growable $class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer. 
    $plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer. 
    $plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to 
    (TraversableOnce.scala:273) at scala.collection.AbstractIterator.to 
    (Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer 
    (TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer 
    (Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray 
    (TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray 
    (Iterator.scala:1157) 
+0

你可以包括你的Scala代碼的休息嗎? – zero323

+0

你執行了第一段嗎? –

+0

是的。我執行第一段。 –

回答

0

我的輸出後執行的代碼

import java.lang.Object 
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.SparkContext._ 
import org.apache.spark.sql.cassandra._ 
import org.apache.spark.sql.SaveMode 
import org.apache.spark.sql 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.dstream.DStream 
import org.apache.spark.streaming.StreamingContext._ 
import com.datastax.spark.connector.streaming._ 
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.mllib.linalg.{Vector, Vectors} 
import org.apache.spark.mllib.regression.LabeledPoint 
import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD 
import org.apache.spark.streaming.{Seconds, StreamingContext} 
import org.apache.spark.streaming.dstream.ConstantInputDStream 
import org.apache.spark.mllib.evaluation.RegressionMetrics 
defined module StreamingLinReg 

FINISHED 
Took 15 seconds