2016-11-30 19 views
0

我有一個像pyspark GROUPBY和最大值選擇

name city  date 
satya Mumbai 13/10/2016 
satya Pune 02/11/2016 
satya Mumbai 22/11/2016 
satya Pune 29/11/2016 
satya Delhi 30/11/2016 
panda Delhi 29/11/2016 
brata BBSR 28/11/2016 
brata Goa  30/10/2016 
brata Goa  30/10/2016 

我需要尋找出最優先的城市每個名稱和邏輯一pyspark數據幀是「如果有最大的城市沒有采取城市爲fav_city城市。聚合「名稱」+「城市」對中的事件。如果發現多個相同的事件,則考慮具有最新日期的城市。將解釋:

d = df.groupby('name','city').count() 
#name city count 
brata Goa 2 #clear favourite 
brata BBSR 1 
panda Delhi 1 #as single so clear favourite 
satya Pune 2 ##Confusion 
satya Mumbai 2 ##confusion 
satya Delhi 1 ##shd be discard as other cities having higher count than this city 

#So get cities having max count 
dd = d.groupby('name').agg(F.max('count').alias('count')) 
ddd = dd.join(d,['name','count'],'left') 
#name count city 
brata 2 Goa #fav found 
panda 1 Delhi #fav found 
satya 2 Mumbai #can't say 
satya 2 Pune #can't say 

在用戶薩蒂亞「我需要回去trx_history和獲得最新的日期有equal_max算我的城市案例:從「孟買」或「普納」這是最後的成交E(最大日期),將該城市視爲fav_city。在這種情況下,'Pune'爲'29/11/2016'爲最新/最大日期。

但是我無法進一步如何完成這項工作。

請幫助我的邏輯或如果有更好的解決方案(更快/緊湊的方式),請建議。謝謝。

回答

2

首先轉換日期爲DateType

df_with_date = df.withColumn(
    "date", 
    F.unix_timestamp("date", "dd/MM/yyyy").cast("timestamp").cast("date") 
) 

下一頁groupBy用戶和城市,但延長的聚集是這樣的:

df_agg = (df_with_date 
    .groupBy("name", "city") 
    .agg(F.count("city").alias("count"), F.max("date").alias("max_date"))) 

定義一個窗口:

from pyspark.sql.window import Window 

w = Window().partitionBy("name").orderBy(F.desc("count"), F.desc("max_date")) 

添加排名:

df_with_rank = (df_agg 
    .withColumn("rank", F.dense_rank().over(w))) 

和過濾:

result = df_with_rank.where(F.col("rank") == 1) 

您可以檢測剩餘使用這樣的代碼重複:

import sys 

final_w = Window().partitionBy("name").rowsBetween(-sys.maxsize, sys.maxsize) 
result.withColumn("tie", F.count("*").over(final_w) != 1)