2015-12-29 54 views
2

我已經給出了示例表。我想從「價值」列中獲取每個組「源」列的中位數。凡 源列是字符串數據類型的 值列是雙數據類型的如何計算火花sqlContext中的數據類型爲double的列

scala> sqlContext.sql("SELECT * from tTab order by source").show 

+---------------+-----+               
|   Source|value| 
+---------------+-----+ 
|131.183.222.110| 1.0| 
| 131.183.222.85| 1.0| 
| 131.183.222.85| 0.0| 
| 131.183.222.85| 0.5| 
| 131.183.222.85| 1.0| 
| 131.183.222.85| 1.0| 
| 43.230.146.7| 0.0| 
| 43.230.146.7| 1.0| 
| 43.230.146.7| 1.0| 
| 43.230.146.8| 1.0| 
| 43.230.146.8| 1.0| 
+---------------+-----+ 

scala> tTab.printSchema 

root 
|-- Source: string (nullable = true) 
|-- value: double (nullable = true) 

預期的答案:

+---------------+-----+ 
|   Source|value| 
+---------------+-----+ 
|131.183.222.110| 1.0| 
| 131.183.222.85| 1.0| 
| 43.230.146.7| 1.0| 
| 43.230.146.8| 1.0| 
+---------------+-----+ 

如果「值」欄中將是詮釋,下面的查詢工作。由於「價值」的數據類型爲雙,這是給我的錯誤:

sqlContext.sql("SELECT source , percentile(value,0.5) OVER (PARTITION BY source) AS Median from tTab ").show 

錯誤:

org.apache.hadoop.hive.ql.exec.NoMatchingMethodException: No matching method for class org.apache.hadoop.hive.ql.udf.UDAFPercentile with (double, double). Possible choices: _FUNC_(bigint, array<double>) _FUNC_(bigint, double) 
    at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getMethodInternal(FunctionRegistry.java:1164) 
    at org.apache.hadoop.hive.ql.exec.DefaultUDAFEvaluatorResolver.getEvaluatorClass(DefaultUDAFEvaluatorResolver.java:83) 
    at org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBridge.getEvaluator(GenericUDAFBridge.java:56) 
    at org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver.getEvaluator(AbstractGenericUDAFResolver.java:47) 
    at org.apache.spark.sql.hive.HiveWindowFunction.evaluator$lzycompute(hiveUDFs.scala:351) 
    at org.apache.spark.sql.hive.HiveWindowFunction.evaluator(hiveUDFs.scala:349) 
    at org.apache.spark.sql.hive.HiveWindowFunction.returnInspector$lzycompute(hiveUDFs.scala:357) 
    at org.apache.spark.sql.hive.HiveWindowFunction.returnInspector(hiveUDFs.scala:356) 
    at org.apache.spark.sql.hive.HiveWindowFunction.dataType(hiveUDFs.scala:362) 
    at org.apache.spark.sql.catalyst.expressions.WindowExpression.dataType(windowExpressions.scala:313) 
    at org.apache.spark.sql.catalyst.expressions.Alias.toAttribute(namedExpressions.scala:140) 
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$35$$anonfun$apply$15.applyOrElse(Analyzer.scala:856) 
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$35$$anonfun$apply$15.applyOrElse(Analyzer.scala:852) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:227) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:227) 
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:226) 
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$35.apply(Analyzer.scala:852) 
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$35.apply(Analyzer.scala:863) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$.org$apache$spark$sql$catalyst$analysis$Analyzer$ExtractWindowExpressions$$addWindow(Analyzer.scala:849) 
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$apply$16.applyOrElse(Analyzer.scala:957) 
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$apply$16.applyOrElse(Analyzer.scala:913) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:227) 
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:227) 
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:226) 
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$.apply(Analyzer.scala:913) 
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$.apply(Analyzer.scala:745) 
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83) 
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80) 
    at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) 
    at scala.collection.immutable.List.foldLeft(List.scala:84) 
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80) 
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72) 
    at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:916) 
    at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:916) 
    at org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:914) 
    at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:132) 
    at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51) 
    at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:725) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:20) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:25) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:27) 
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:29) 
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:31) 
    at $iwC$$iwC$$iwC.<init>(<console>:33) 
    at $iwC$$iwC.<init>(<console>:35) 
    at $iwC.<init>(<console>:37) 
    at <init>(<console>:39) 
    at .<init>(<console>:43) 
    at .<clinit>(<console>) 
    at .<init>(<console>:7) 
    at .<clinit>(<console>) 
    at $print(<console>) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) 
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340) 
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) 
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) 
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) 
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) 
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) 
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) 
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) 
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) 
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) 
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) 
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) 
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) 
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) 
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) 
    at org.apache.spark.repl.Main$.main(Main.scala:31) 
    at org.apache.spark.repl.Main.main(Main.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672) 
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) 
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 

太謝謝你了!

回答

11

對於非整數值,你應該使用percentile_approx UDF:

import org.apache.spark.mllib.random.RandomRDDs 

val df = RandomRDDs.normalRDD(sc, 1000, 10, 1).map(Tuple1(_)).toDF("x") 
df.registerTempTable("df") 
sqlContext.sql("SELECT percentile_approx(x, 0.5) FROM df").show 

// +--------------------+ 
// |     _c0| 
// +--------------------+ 
// |0.035379710486199915| 
// +--------------------+ 

在一個側面不應該使用GROUP BYPARTITION BY。後者用於窗口函數,並且效果與預期不同。

SELECT source, percentile_approx(value, 0.5) FROM df GROUP BY source 

又見How to find median using Spark

+0

你也可以這樣做: 'SELECT源,percentile_approx(值,數組(0.25,0.5,0.75)FROM DF GROUP BY source' 多個百分點 – rye

+1

@rye下跌免費編輯這是一個wiki。答案: – zero323

+0

@ zero323我在play框架中使用spark 1.6.1版本,'percentile_approx'工作在'spark-shell'中,但我無法在'Play Framework'中使用相同的代碼。除了'spark-core'和'spark-sql'之外的jar包不會讓它運行。你能指導這個嗎? – analyticalpicasso

相關問題