2016-07-05 202 views
0

我有用戶遊戲會話,其中包含:用戶ID,遊戲ID,分數和遊戲進行時的時間戳。從後續項目中總結第一個分組的項目

from pyspark import SparkContext 
from pyspark.sql import HiveContext 
from pyspark.sql import functions as F 

sc = SparkContext("local") 

sqlContext = HiveContext(sc) 

df = sqlContext.createDataFrame([ 
    ("u1", "g1", 10, 0), 
    ("u1", "g3", 2, 2), 
    ("u1", "g3", 5, 3), 
    ("u1", "g4", 5, 4), 
    ("u2", "g2", 1, 1), 
], ["UserID", "GameID", "Score", "Time"]) 

所需的輸出

+------+-------------+-------------+ 
|UserID|MaxScoreGame1|MaxScoreGame2| 
+------+-------------+-------------+ 
| u1|   10|   5| 
| u2|   1|   null| 
+------+-------------+-------------+ 

我要變換的數據,我得到的第一場比賽的用戶播放以及第二場比賽的最高分的最高分(獎金如果我也可以獲得所有後續遊戲的最高分數)。不幸的是,我不確定用Spark SQL可以做什麼。

我知道我可以通過UserID,GameID進行分組,然後通過agg來獲得最高分和最短時間。不知道如何從那裏繼續。

說明:請注意MaxScoreGame1和MaxScoreGame2是指第一個和第二個遊戲用戶播放器;不是GameID。

回答

1

您可以嘗試使用Window函數和Pivot的組合。

  1. 獲取按時間排序的用戶ID劃分的每個遊戲的行號。
  2. 過濾掉GameNumber爲1或2.
  3. 旋轉即可獲得所需的輸出形狀。

不幸的是我使用scala而不是python,但下面的代碼應該很容易轉換爲python庫。

import org.apache.spark.sql.expressions.Window 

// Use a window function to get row number 
val rowNumberWindow = Window.partitionBy(col("UserId")).orderBy(col("Time")) 

val output = { 
    df 
    .select(
     col("*"), 
     row_number().over(rowNumberWindow).alias("GameNumber") 
    ) 
    .filter(col("GameNumber") <= lit(2)) 
    .groupBy(col("UserId")) 
    .pivot("GameNumber") 
    .agg(
     sum(col("Score")) 
    ) 
} 

output.show() 

+------+---+----+ 
|UserId| 1| 2| 
+------+---+----+ 
| u1| 10| 2| 
| u2| 1|null| 
+------+---+----+ 
+1

另外,如果你想看到兩個以上的遊戲在輸出就是不過濾和樞軸將其餘的工作補充。 – Blakey

+0

窗口和row_number做了訣竅。我將在PySpark中發佈我的解決方案,它有點不同。你可以驗證你的代碼是否適合演出,所以我可以給你答案? – ksindi

+1

剛剛更新輸出,也注意到我實際上使用select而不是groupBy在數據透視表上,這是行不通的。根據你的帖子(「u1」,「g3」,2,2),(「u1」,「u1」),對於你如何得到5作爲用戶1的第二遊戲得分,假設原始數據框中存在拼寫錯誤「g3」,5,3), – Blakey

1

解決方案與PySpark:

from pyspark.sql import Window 

rowNumberWindow = Window.partitionBy("UserID").orderBy(F.col("Time")) 

(df 
.groupBy("UserID", "GameID") 
.agg(F.max("Score").alias("Score"), 
     F.min("Time").alias("Time")) 
.select(F.col("*"), 
     F.row_number().over(rowNumberWindow).alias("GameNumber")) 
.filter(F.col("GameNumber") <= F.lit(2)) 
.withColumn("GameMaxScoreCol", F.concat(F.lit("MaxScoreGame"), F.col("GameNumber"))) 
.groupBy("UserID") 
.pivot("GameMaxScoreCol") 
.agg(F.max("Score")) 
).show() 

+------+-------------+-------------+ 
|UserID|MaxScoreGame1|MaxScoreGame2| 
+------+-------------+-------------+ 
| u1|   10|   5| 
| u2|   1|   null| 
+------+-------------+-------------+ 
相關問題