1
我需要使用我自己的用戶定義函數來過濾Spark數據幀。我的數據框是使用jdbc連接從數據庫中讀取的,然後在進行過濾之前通過spark中的自聯接操作。在嘗試過濾後的數據幀collect
時發生錯誤。自加入後使用UDF的Spark 2.0過濾器
我一直在Spark 1.6中成功使用它。然而,在升級到2.0後,昨日它失敗,錯誤:
py4j.protocol.Py4JJavaError: An error occurred while calling o400.collectToPython.
: java.lang.UnsupportedOperationException: Cannot evaluate expression:
<lambda>(input[0, string, true])
這裏是產生錯誤(在我的環境),一個小例子:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import BooleanType
spark = SparkSession.builder.master('local').appName('test').getOrCreate()
# this works successfully
df = spark.createDataFrame([('Alice', 1), ('Bob', 2), ('Dan', None)],
['name', 'age'])
df.filter(udf(lambda x: 'i' in x, BooleanType())(df.name)).collect()
>>> [Row(name=u'Alice', age=1)]
# this produces the error
df_emp = spark.createDataFrame([(1, 'Alice', None), (2, 'Bob', 1),
(3, 'Dan', 2), (4, 'Joe', 2)],
['id', 'name', 'manager_id'])
df1 = df_emp.alias('df1')
df2 = df_emp.alias('df2')
cols = df1.columns
# the self-join
result = df1.join(df2, col('df1.id') == col('df2.manager_id'), 'left_outer')
result.collect()
>>> [Row(id=1, name=u'Alice', manager_id=None),
Row(id=3, name=u'Dan', manager_id=2), Row(id=2, name=u'Bob', manager_id=1),
Row(id=2, name=u'Bob', manager_id=1), Row(id=4, name=u'Joe', manager_id=2)]
# simple udf filter
filtered = result.filter(udf(lambda x: 'i' in x, BooleanType())(result.name))
filtered.collect()
# the above error is produced...
難道我做錯什麼在這種情況下, ?這是2.0中的一個錯誤還是應該考慮兩個版本之間的行爲改變?
我一直在敲我的頭掛在牆上,因爲在之前的會議上工作了UDF是失敗。這救了我!謝謝蒂姆! –