0
enter image description here嘗試在我的臨時表上選擇時收到錯誤消息。有人可以幫助我嗎?在齊柏林飛船上找不到溫度
object StreamingLinReg extends java.lang.Object{
val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", "127.0.0.1").setAppName("Streaming Liniar Regression")
.set("spark.cassandra.connection.port", "9042")
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
val sc = new SparkContext(conf);
val ssc = new StreamingContext(sc, Seconds(1));
val sqlContext = new org.apache.spark.sql.SQLContext(sc);
import sqlContext.implicits._
val trainingData = ssc.cassandraTable[String]("features","consumodata").select("consumo", "consumo_mensal", "soma_pf", "tempo_gasto").map(LabeledPoint.parse)
trainingData.toDF.registerTempTable("training")
val dstream = new ConstantInputDStream(ssc, trainingData)
val numFeatures = 100
val model = new StreamingLinearRegressionWithSGD()
.setInitialWeights(Vectors.zeros(numFeatures))
.setNumIterations(1)
.setStepSize(0.1)
.setMiniBatchFraction(1.0)
model.trainOn(dstream)
model.predictOnValues(dstream.map(lp => (lp.label, lp.features))).foreachRDD { rdd =>
val metrics = new RegressionMetrics(rdd)
val MSE = metrics.meanSquaredError //Squared error
val RMSE = metrics.rootMeanSquaredError //Squared error
val MAE = metrics.meanAbsoluteError //Mean absolute error
val Rsquared = metrics.r2
//val Explained variance = metrics.explainedVariance
rdd.toDF.registerTempTable("liniarRegressionModel")
}
}
ssc.start()
ssc.awaitTermination()
//}
}
%sql
select * from liniarRegressionModel limit 10
當我做選擇臨時表時我得到一個錯誤消息。我執行select temp temp表後執行第一段。
org.apache.spark.sql.AnalysisException: Table not found: liniarRegressionModel; line 1 pos 14 at org.apache.spark.sql.catalyst.analysis.
package$AnalysisErrorAt.failAnalysis (package.scala:42) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations
$.getTable (Analyzer.scala:305) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations
$$anonfun$apply$9.applyOrElse
(Analyzer.scala:314) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations
$$anonfun$apply$9.applyOrElse(Analyzer.scala:309) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57) at org.apache.spark.sql.catalyst.trees.CurrentOrigin
$.withOrigin(TreeNode.scala:69) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators
(LogicalPlan.scala:56) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
$$anonfun$1.apply(LogicalPlan.scala:54) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply
(LogicalPlan.scala:54) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply
(TreeNode.scala:281) at scala.collection.Iterator
$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$
class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach
(Iterator.scala:1157) at scala.collection.generic.Growable $class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.
$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.
$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to
(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to
(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer
(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer
(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray
(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray
(Iterator.scala:1157)
你可以包括你的Scala代碼的休息嗎? – zero323
你執行了第一段嗎? –
是的。我執行第一段。 –