2016-08-06 71 views
1

我有一個含有不同模式的spark數據框的列表。例如:需要未知列數的Spark UDF

list_df = [df1, df2, df3, df4] 
# df1.columns = ['a', 'b'] 
# df2.columns = ['a', 'b', 'c'] 
# df3.columns = ['a', 'b', 'c', 'd'] 
# df4.columns = ['a', 'b', 'c', 'd', 'e'] 

現在,我想編寫一個單一的udf,它能夠在具有不同列數的數據幀列表上操作。

以前有一篇關於如何使用scala的文章:Spark UDF with varargs,其中udf包含一列數組。

但似乎該方法不適用於python。有什麼建議麼?

謝謝。

回答

3

其實這個方法效果就好在Python:

from pyspark.sql.functions import array, udf 

df = sc.parallelize([("a", "b", "c", "d")]).toDF() 

f = udf(lambda xs: "+".join(xs)) 

df.select(f("_1")).show() 
## +------------+ 
## |<lambda>(_1)| 
## +------------+ 
## |   a| 
## +------------+ 

df.select(f(array("_1", "_2"))).show() 
## +-----------------------+ 
## |<lambda>(array(_1, _2))| 
## +-----------------------+ 
## |     a+b| 
## +-----------------------+ 

df.select(f(array("_1", "_2", "_3"))).show() 
## +---------------------------+ 
## |<lambda>(array(_1, _2, _3))| 
## +---------------------------+ 
## |      a+b+c| 
## +---------------------------+ 

因爲Python UDF是不一樣的類型像他們斯卡拉對應的實體都沒有通過輸入參數也使用args來的種類和數量的限制:

g = udf(lambda *xs: "+".join(xs)) 

df.select(g("_1", "_2", "_3", "_4")).show() 
## +------------------------+ 
## |<lambda>(_1, _2, _3, _4)| 
## +------------------------+ 
## |     a+b+c+d| 
## +------------------------+ 

避免與array包裝輸入。

您還可以使用struct作爲替代的包裝去的列名訪問:

h = udf(lambda row: "+".join(row.asDict().keys())) 

df.select(h(struct("_1", "_2", "_3"))).show() 
## +----------------------------+ 
## |<lambda>(struct(_1, _2, _3))| 
## +----------------------------+ 
## |     _1+_3+_2| 
## +----------------------------+ 
+0

非常感謝!有用。 – Yiliang

+0

一個相關的問題:有沒有一種方法可以訪問udf中的列名,這樣我就可以從正確的字段獲取值?謝謝。 – Yiliang

+0

你可以試試struct。 – zero323