2017-07-24 27 views
0

我想創建一個新的數據幀,其中列type將成爲topX基於最高count。 將有一個附加類型他人),這將是所有,而TYPEx的同一組的name總和pyspark - 創建Top3組和聚合其他組/行

爲DF:

data = spark.createDataFrame([ 
     ("name1", "type1", 2), ("name1", "type2", 1), ("name1", "type3", 4), ("name1", "type3", 5), \ 
     ("name2", "type1", 6), ("name2", "type1", 7), ("name2", "type2", 8) \ 
    ],["name", "type", "cnt"]) 
    data.printSchema() 

什麼是:

|name |type|cnt| 
|------|----------- 
|name1 |typeA| 6| 
|name1 |typeX| 5| 
|name1 |typeW| 3| 
|name1 |typeZ| 1| 
|name2 |typeA| 7| 
|name2 |typeB| 2| 
| .... | ... | | 

所得數據幀(用於頂部2)將是: 每個名字具有TOP2值+ '其它'(3組)

|name |type|cnt| 
|------|----------- 
|name1 |typeA| 6| 
|name1 |typeX| 5| 
|name1 |other| 4| 
|name2 |typeA| 7| 
|name2 |typeB| 2| 
|name2 |other| 0| 
| .... | ... | | 

我不確定怎麼可以我跳過某一組的X行,然後開始聚合剩餘的行。

+0

每個名稱是否有重複的類型?你的代碼似乎沒有給你顯示的表。 – Psidom

回答

2

我嘗試使用窗口函數以及基於名稱和cnt的行的排名,然後篩選每個名稱的前兩個等級並聚合其他名稱,最後將它們合併。

>>> from pyspark.sql import SparkSession 
>>> spark = SparkSession.builder.getOrCreate() 
>>> data = spark.createDataFrame([ 
    ("name1", "type1", 2), ("name1", "type2", 1), ("name1", "type3", 4), ("name1", "type3", 5), \ 
    ("name2", "type1", 6), ("name2", "type1", 7), ("name2", "type2", 8) \ 
],["name", "type", "cnt"]) 
>>> data.show() 
+-----+-----+---+ 
| name| type|cnt| 
+-----+-----+---+ 
|name1|type1| 2| 
|name1|type2| 1| 
|name1|type3| 4| 
|name1|type3| 5| 
|name2|type1| 6| 
|name2|type1| 7| 
|name2|type2| 8| 
+-----+-----+---+ 

>>> from pyspark.sql.window import Window 
>>> from pyspark.sql.functions import rank, col,lit 
>>> window = Window.partitionBy(data['name']).orderBy(data['cnt'].desc()) 
>>> data1 = data.select('*', rank().over(window).alias('rank')) 
>>> data1.show() 
+-----+-----+---+----+ 
| name| type|cnt|rank| 
+-----+-----+---+----+ 
|name1|type3| 5| 1| 
|name1|type3| 4| 2| 
|name1|type1| 2| 3| 
|name1|type2| 1| 4| 
|name2|type2| 8| 1| 
|name2|type1| 7| 2| 
|name2|type1| 6| 3| 
+-----+-----+---+----+ 
>>> data2 = data1.filter(data1['rank'] > 2).groupby('name').sum('cnt').select('name',lit('other').alias('type'),col('sum(cnt)').alias('cnt')) 
>>> data2.show() 
+-----+-----+---+ 
| name| type|cnt| 
+-----+-----+---+ 
|name1|other| 3| 
|name2|other| 6| 
+-----+-----+---+ 
>>> data1.filter(data1['rank'] <=2).select('name','type','cnt').union(data2).show() 
+-----+-----+---+ 
| name| type|cnt| 
+-----+-----+---+ 
|name1|type3| 5| 
|name1|type3| 4| 
|name2|type2| 8| 
|name2|type1| 7| 
|name1|other| 3| 
|name2|other| 6| 
+-----+-----+---+