3

我目前正在使用Spark 2.1,並有一個主腳本調用一個包含我所有轉換方法的輔助模塊。換句話說:PySpark 2.1:UDF中斷的導入模塊Hive連接

main.py 
helper.py 

在我helper.py文件的頂部,我有以下方式定義了幾個自定義的UDF:

def reformat(s): 
    return reformat_logic(s) 
reformat_udf = udf(reformat, StringType()) 

我打破了之前關閉所有的UDF到幫助文件,我可以使用spark.sql('sql statement')通過我的SparkSession對象連接到我的Hive Metastore。但是,在將UDF移動到幫助程序文件並在主腳本的頂部導入該文件後,SparkSession對象不能再連接到Hive並返回到默認的Derby數據庫。嘗試查詢我的蜂巢表,例如Hive support is required to insert into the following tables...

我已經能夠通過移動我的UDF到一個完全獨立的文件,只運行import語句爲需要的功能內部的模塊來解決我的問題,當我還得到錯誤他們(不知道這是否是好的做法,但它的工作原理)。無論如何,有沒有人知道爲什麼我看到Spark和UDF的特殊行爲?有沒有人知道跨應用程序共享UDF的好方法?

+3

請看[SPARK-19163](https://issues.apache.org/jira/browse/SPARK-19163)。 udf'需要一個活動的會話才能工作,所以如果不存在的話,它會初始化一個會話。如果在單獨的模塊中創建UserDefinedFunction對象,則必須在初始化會話後導入它。 – zero323

+2

謝謝@ zero323!這正是我遇到的問題。很高興看到它將被固定在2.2。 – vertigokidd

回答

2

Spark之前2.2.0 UserDefinedFunction熱切地創建UserDefinedPythonFunction對象,它代表JVM上的Python UDF。此過程需要訪問SparkContextSparkSession。如果在調用UserDefinedFunction.__init__時沒有活動實例,則Spark將自動爲您初始化上下文。

當您在導入UserDefinedFunction對象後調用SparkSession.Builder.getOrCreate時,它將返回現有的SparkSession實例,並且只能應用一些配置更改(其中不包括enableHiveSupport)。

from pyspark.sql.session import SparkSession 

spark = SparkSession.builder.enableHiveSupport().getOrCreate() 

from helper import reformat_udf 

此行爲SPARK-19163描述並固定在星火2.2.0:

要導入UDF之前解決這個問題,你應該初始化SparkSession。其他API改進包括裝飾器語法(SPARK-19160)和改進的文檔字符串處理(SPARK-19161)。