2017-07-11 107 views
0

我是pyspark的新手,我嘗試創建一個簡單的udf,必須帶兩個輸入列,檢查第二列是否有空格,如果是,則將第一列一到兩個值並覆蓋原始列。這是我做了什麼:如何在pyspark中創建具有兩個輸入的UDF

def split(x, y): 
if x == "EXDRA" and y == "": 
    return ("EXT", "DCHA") 
if x == "EXIZQ" and y == "": 
    return ("EXT", "IZDA") 

udf_split = udf(split, ArrayType()) 

df = df \ 
.withColumn("x", udf_split(df['x'], df['y'])[1]) \ 
.withColumn("y", udf_split(df['x'], df['y'])[0]) 

但是當我運行這段代碼,我得到了以下錯誤:

File "<stdin>", line 1, in <module> 
TypeError: __init__() takes at least 2 arguments (1 given) 

我在做什麼錯?

謝謝 阿爾瓦羅

回答

1

我不知道你正在嘗試做的,但我這是怎麼會從我的理解做:

from pyspark.sql.types import * 
from pyspark.sql.functions import udf, col 

def split(x, y): 
    if x == "EXDRA" and y == "": 
     return ("EXT", "DCHA") 
    if x == "EXIZQ" and y == "": 
     return ("EXT", "IZDA") 

schema = StructType([StructField("x1", StringType(), False), StructField("y1", StringType(), False)]) 
udf_split = udf(split, schema) 

df = spark.createDataFrame([("EXDRA", ""), ("EXIZQ", ""), ("", "foo")], ("x", "y")) 

df.show() 

# +-----+---+ 
# | x| y| 
# +-----+---+ 
# |EXDRA| | 
# |EXIZQ| | 
# |  |foo| 
# +-----+---+ 

df = df \ 
.withColumn("split", udf_split(df['x'], df['y'])) \ 
.withColumn("x", col("split.x1")) \ 
.withColumn("y", col("split.y1")) 

df.printSchema() 

# root 
# |-- x: string (nullable = true) 
# |-- y: string (nullable = true) 
# |-- split: struct (nullable = true) 
# | |-- x1: string (nullable = false) 
# | |-- y1: string (nullable = false) 


df.show() 

# +----+----+----------+ 
# | x| y|  split| 
# +----+----+----------+ 
# | EXT|DCHA|[EXT,DCHA]| 
# | EXT|IZDA|[EXT,IZDA]| 
# |null|null|  null| 
# +----+----+----------+ 
相關問題