2017-08-08 49 views
1

我想將此結構的數據框更改爲第二個。使用groupby或aggregate來合併RDD或DataFrame中的每個事務中的項目以進行FP增長

+---+-----+-----+ 
| id|order|items| 
+---+-----+-----+ 
| 0| a| 1| 
| 1| a| 2| 
| 2| a| 5| 
| 3| b| 1| 
| 4| b| 2| 
| 5| b| 3| 
| 6| b| 5| 
| 7| c| 1| 
| 8| c| 2| 
+---+-----+-----+ 

它改成這樣:

+---+-----+------------+ 
| id|order|  items| 
+---+-----+------------+ 
| 0| a| [1, 2, 5]| 
| 1| b|[1, 2, 3, 5]| 
| 2| c|  [1, 2]| 
+---+-----+------------+ 

我如何能做到這一點的Pyspark?

回答

0

Groupby以便與collect_list功能,並與row_number一個唯一的ID,你的情況應該工作

from pyspark.sql import functions as F 
df.groupBy("order").agg(F.collect_list("items")) 
    .withColumn("id", F.row_number().over(Window.orderBy("order"))) 

希望這有助於!

+0

.withColumn( 「ID」,F.monotonically_increasing_id())這段代碼不能正常工作,它給了ID喜歡1047972020224,一個隨機數。另一個問題是,當我用十億行處理一個非常大的數據時,這種方法很快? – Oak

+0

更新了答案,請檢查 –

+0

如果我不使用Dataframe但RDD具有相同的數據,你知道我該怎麼做同樣的關聯分析? @Shankar Koirala – Oak

1

你可以做

from pyspark.sql.functions import * 
df.groupBy(df.order).agg(collect_list("items").alias("items")) 

編輯

的情況下,你想要做的RDD一樣的,你可以做以下(斯卡拉)

rdd.groupBy(x => x._2).mapValues(x => x.map(y => y._3)).zipWithIndex() 

鑑於RDD as

(0,a,1) 
(1,a,2) 
(2,a,5) 
(3,b,1) 
(4,b,2) 
(5,b,3) 
(6,b,5) 
(7,c,1) 
(8,c,2) 

結果將是

((a,List(1, 2, 5)),0) 
((b,List(1, 2, 3, 5)),1) 
((c,List(1, 2)),2) 
+0

謝謝,這種方法在處理十億行的超大型數據時足夠快嗎? – Oak

+0

是的,應該是:) –

+0

太好了,謝謝! :) – Oak

相關問題