2016-07-14 113 views
0

我有一個由「Events」,「Time」,「UserId」組成的日誌文件。用pySpark計算用戶事件之間的平均時間

+------------+----------------+---------+ 
| Events |  Time  | UserId | 
+------------+----------------+---------+ 
| ClickA  | 7/6/16 10:00am | userA | 
+------------+----------------+---------+ 
| ClickB  | 7/6/16 12:00am | userA | 
+------------+----------------+---------+ 

我想爲每個用戶計算事件之間的平均時間。你們如何解決這個問題? 在傳統的編程環境中,我會經歷用戶的每個事件,並計算事件nn-1之間的時間增量,將此值添加到數組A.然後,我將計算每個值的平均值A. 如何使用Spark做到這一點?

回答

1

忽略日期解析它看起來像一個窗口功能,然後進行簡單的聚集,從而大致一份工作,你需要這樣的事:

import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions.{lag, avg} 

val df = Seq(
    ("ClickA", "2016-06-07 10:00:00", "UserA"), 
    ("ClickB", "2016-06-07 12:00:00", "UserA") 
).toDF("events", "time", "userid").withColumn("time", $"time".cast("timestamp")) 

val w = Window.partitionBy("userid").orderBy("time") 

// Difference between consecutive events in seconds 
val diff = $"time".cast("long") - lag($"time", 1).over(w).cast("long") 

df.withColumn("diff", diff).groupBy("userid").agg(avg($"diff")) 
+0

謝謝zero323! 你知道我該如何將這個字符串(2016/5/1 4:03:34 PM)轉換爲時間戳嗎?我找不到pyspark的正確方法。 – Ahmet

+0

非常像這裏顯示:http://stackoverflow.com/a/36095322/1560062但你必須調整格式(https://docs.oracle.com/javase/7/docs/api/java/text /SimpleDateFormat.html) – zero323