2016-11-01 108 views
2

我正在使用Spark 2.0.0作品,其中我的要求是在我的sql上下文中使用'com.facebook.hive.udf.UDFNumberRows'函數來使用在其中一個查詢中。在我的Hive查詢集羣中,我只是通過定義CREATE TEMPORARY FUNCTION myFunc AS'com.facebook.hive.udf.UDFNumberRows'來將它用作臨時函數,這非常簡單。使用Spark(Spark SQL)註冊Hive自定義UDF 2.0.0

我試着用如下sparkSession註冊這一點,但得到了一個錯誤:

sparkSession.sql("""CREATE TEMPORARY FUNCTION myFunc AS 'com.facebook.hive.udf.UDFNumberRows'""") 

錯誤:

CREATE TEMPORARY FUNCTION rowsequence AS 'com.facebook.hive.udf.UDFNumberRows' 
16/11/01 20:46:17 ERROR ApplicationMaster: User class threw exception: java.lang.UnsupportedOperationException: Use sqlContext.udf.register(...) instead. 
java.lang.UnsupportedOperationException: Use sqlContext.udf.register(...) instead. 
    at org.apache.spark.sql.catalyst.catalog.SessionCatalog.makeFunctionBuilder(SessionCatalog.scala:751) 
    at org.apache.spark.sql.execution.command.CreateFunctionCommand.run(functions.scala:61) 
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:60) 
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:58) 
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86) 
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:186) 
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:167) 
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:65) 
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582) 
    at com.mediamath.spark.attribution.sparkjob.SparkVideoCidJoin$.delayedEndpoint$com$mediamath$spark$attribution$sparkjob$SparkVideoCidJoin$1(SparkVideoCidJoin.scala:75) 
    at com.mediamath.spark.attribution.sparkjob.SparkVideoCidJoin$delayedInit$body.apply(SparkVideoCidJoin.scala:22) 
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34) 
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) 
    at scala.App$$anonfun$main$1.apply(App.scala:76) 
    at scala.App$$anonfun$main$1.apply(App.scala:76) 
    at scala.collection.immutable.List.foreach(List.scala:381) 
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) 
    at scala.App$class.main(App.scala:76) 
    at com.mediamath.spark.attribution.sparkjob.SparkVideoCidJoin$.main(SparkVideoCidJoin.scala:22) 
    at com.mediamath.spark.attribution.sparkjob.SparkVideoCidJoin.main(SparkVideoCidJoin.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:627) 

有誰知道知道如何與寄存器API來註冊爲火花要求,即在sparkSession和SQLContext中:

sqlContext.udf.register(...) 

回答

1

您可以直接註冊UDF u請唱歌SparkSession,如sparkSession.udf.register("myUDF", (arg1: Int, arg2: String) => arg2 + arg1)。看看詳細的文檔here

3

在星火2.0,

sparkSession.udf.register(...) 

允許用戶註冊(Long類型=>長的函數)Java或斯卡拉的UDF,但不攆GenericUDFs該處理LongWritable而不是龍,並且可以具有可變數量的參數。

要註冊蜂巢的UDF,你的第一種方法是正確的:

sparkSession.sql("""CREATE TEMPORARY FUNCTION myFunc AS 'com.facebook.hive.udf.UDFNumberRows'""") 

但是您必須啓用蜂巢支持第一:

SparkSession.builder().enableHiveSupport() 

,並確保了「火花,不亦樂乎」的依賴存在在你的類路徑中。

說明:

你的錯誤信息

java.lang.UnsupportedOperationException: Use sqlContext.udf.register(...) instead 

來自類SessionCatalog

通過調用SparkSession.builder().enableHiveSupport(),spark 將用實現方法makeFunctionBuilder的HiveSessionCatalog代替SessionCatalog。

最後:

要使用UDF,「com.facebook.hive.udf.UDFNumberRows」,已經寫在一個時間,窗函數是不具備的蜂巢。 我建議你改用它們。您可以檢查Hive Reference, 這個Spark-SQL introthis if you want to stick to the scala syntax

+0

抱歉,這不是一個答案,這個問題 - 它應該只是一個評論 –

+0

@T.Gawęda謝謝你指出我的回答不夠清楚。我花時間更清楚地重寫它。 – FurryMachine

0

你正面臨的問題是,Spark沒有將jar庫加載到他的classPath中。

在我們的團隊中,我們使用--jars選項加載外部庫。

/usr/bin/spark-submit --jars external_library.jar our_program.py --our_params 

您可以檢查您是否裝載在星火歷史外部庫 - 環境標籤。 (spark.yarn.secondary.jars

然後你就可以作爲你說的註冊UDF。一旦你啓用HiveSupport FurryMachine說。

sparkSession.sql(""" 
    CREATE TEMPORARY FUNCTION myFunc AS 
    'com.facebook.hive.udf.UDFNumberRows' 
""") 

你可以找到更多信息在火花峯會--help

hadoop:~/projects/neocortex/src$ spark-submit --help 
Usage: spark-submit [options] <app jar | python file> [app arguments] 
Usage: spark-submit --kill [submission ID] --master [spark://...] 
Usage: spark-submit --status [submission ID] --master [spark://...] 
Usage: spark-submit run-example [options] example-class [example args] 

Options: 
    --master MASTER_URL   spark://host:port, mesos://host:port, yarn, or local. 
    --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or 
           on one of the worker machines inside the cluster ("cluster") 
           (Default: client). 
    --class CLASS_NAME   Your application's main class (for Java/Scala apps). 
    --name NAME     A name of your application. 
    --jars JARS     Comma-separated list of local jars to include on the driver 
           and executor classpaths.