2017-07-03 95 views
0

排序名單上有一個數據幀,其中列有一個稱爲stopped是:UDF在pyspark

+--------------------+ 
|    stopped| 
+--------------------+ 
|[nintendo, dsi, l...| 
|[nintendo, dsi, l...| 
| [xl, honda, 500]| 
|[black, swan, green]| 
|[black, swan, green]| 
|[pin, stripe, sui...| 
| [shooting, braces]| 
|  [haus, geltow]| 
|[60, cm, electric...| 
| [yamaha, yl1, yl2]| 
|[landwirtschaft, ...| 
|  [wingbar, 9581]| 
|  [gummi, 16mm]| 
|[brillen, lupe, c...| 
|[man, city, v, ba...| 
|[one, plus, one, ...| 
|  [kapplocheisen]| 
|[tractor, door, m...| 
|[pro, nano, flat,...| 
|[kaleidoscope, to...| 
+--------------------+ 

我想創建一個包含相同的列表,但這裏的關鍵詞是有序另一列。

據我瞭解,我需要創建一個UDF是需要返回一個列表:

udf_sort = udf(lambda x: x.sort(), ArrayType(StringType())) 
ps_clean.select("*", udf_sort(ps_clean["stopped"])).show(5, False) 

,我也得到:

+---------+----------+---------------------+------------+--------------------------+--------------------------+-----------------+ 
|client_id|kw_id  |keyword    |max_click_dt|tokenized     |stopped     |<lambda>(stopped)| 
+---------+----------+---------------------+------------+--------------------------+--------------------------+-----------------+ 
|710  |4304414582|nintendo dsi lite new|2017-01-06 |[nintendo, dsi, lite, new]|[nintendo, dsi, lite, new]|null    | 
|705  |4304414582|nintendo dsi lite new|2017-03-25 |[nintendo, dsi, lite, new]|[nintendo, dsi, lite, new]|null    | 
|707  |647507047 |xl honda 500 s  |2016-10-26 |[xl, honda, 500, s]  |[xl, honda, 500]   |null    | 
|710  |26308464 |black swan green  |2016-01-01 |[black, swan, green]  |[black, swan, green]  |null    | 
|705  |26308464 |black swan green  |2016-07-13 |[black, swan, green]  |[black, swan, green]  |null    | 
+---------+----------+---------------------+------------+--------------------------+--------------------------+-----------------+ 

爲什麼排序不適用?

回答

2

x.sort()通常排序到位名單(但我懷疑它不會做在一個pyspark數據框中),它返回None。這就是你的專欄標籤<lambda>(stopped)的所有null的值。 sorted(x)將對列表進行排序並返回新的排序副本。所以,用

udf_sort = udf(lambda x: sorted(x), ArrayType(StringType())) 

應該可以解決你的問題。

或者,您可以使用內置函數sort_array而不是定義您自己的udf。

from pyspark.sql.functions import sort_array 

ps_clean.select("*", sort_array(ps_clean["stopped"])).show(5, False) 

這種方法是乾淨了一點,實際上,你可以期望得到一些性能提升,因爲pyspark沒有序列化你的UDF。