2017-10-05 79 views
1

我正嘗試在現有的一組列上使用groupby聚合在Pyspark中創建一個列的新列。一個示例的輸入數據幀被提供如下:collect_list通過保留基於另一個變量的順序

------------------------ 
id | date  | value 
------------------------ 
1 |2014-01-03 | 10 
1 |2014-01-04 | 5 
1 |2014-01-05 | 15 
1 |2014-01-06 | 20 
2 |2014-02-10 | 100 
2 |2014-03-11 | 500 
2 |2014-04-15 | 1500 

預期的輸出是:

id | value_list 
------------------------ 
1 | [10, 5, 15, 20] 
2 | [100, 500, 1500] 

列表中的值是由日期排序。

我嘗試使用collect_list如下:

from pyspark.sql import functions as F 
ordered_df = input_df.orderBy(['id','date'],ascending = True) 
grouped_df = ordered_df.groupby("id").agg(F.collect_list("value")) 

但collect_list不保證順序,即使我聚集之前日期排序輸入數據幀。

有人可以幫助如何通過保留基於第二個(日期)變量的訂單來進行聚合嗎?

回答

6

如果您以列表形式收集日期和值,則可以使用和udf根據日期對結果列進行排序,然後僅保留結果中的值。對於具體的例子

import operator 
import pyspark.sql.functions as F 

# create list column 
grouped_df = input_df.groupby("id") \ 
       .agg(F.collect_list(F.struct("date", "value")) \ 
       .alias("list_col")) 

# define udf 
def sorter(l): 
    res = sorted(l, key=operator.itemgetter(0)) 
    return [item[1] for item in res] 

sort_udf = F.udf(sorter) 

# test 
grouped_df.select("id", sort_udf("list_col") \ 
    .alias("sorted_list")) \ 
    .show(truncate = False) 
+---+----------------+ 
|id |sorted_list  | 
+---+----------------+ 
|1 |[10, 5, 15, 20] | 
|2 |[100, 500, 1500]| 
+---+----------------+ 
+0

謝謝...我只是試圖在幾百萬更大的數據和我收到完全相同的順序爲collect_list的...有沒有辦法來解釋爲什麼這可能正在發生?此外,檢查collect_list似乎只是混淆了在一個日期內具有多個值的情況...是否意味着collect_list還維護了訂單? – Ravi

+1

在你的代碼中,你需要在collect_list()之前排序整個數據集。但是這不是必須的,在收集列表中的日期和值之後對結果列表進行排序會更有效。 – mtoto

+0

只是澄清...排序列和使用collect_list排序列將保存順序? – Ravi

相關問題