2015-12-30 27 views
1

對不起,有一個新手問題。Spark:如何根據userId和時間戳創建sessionId

目前我有日誌文件,其中包含諸如userId,event和timestamp等字段,同時缺少sessionId。我的目標是根據時間戳和預先定義的值TIMEOUT爲每條記錄創建一個sessionId。

如果超時值是10,和樣本數據幀是:

scala> eventSequence.show(false) 

    +----------+------------+----------+ 
    |uerId  |event  |timestamp | 
    +----------+------------+----------+ 
    |U1  |A   |1   | 
    |U2  |B   |2   | 
    |U1  |C   |5   | 
    |U3  |A   |8   | 
    |U1  |D   |20  | 
    |U2  |B   |23  | 
    +----------+------------+----------+ 

的目標是:

+----------+------------+----------+----------+ 
    |uerId  |event  |timestamp |sessionId | 
    +----------+------------+----------+----------+ 
    |U1  |A   |1   |S1  | 
    |U2  |B   |2   |S2  | 
    |U1  |C   |5   |S1  | 
    |U3  |A   |8   |S3  | 
    |U1  |D   |20  |S4  | 
    |U2  |B   |23  |S5  | 
    +----------+------------+----------+----------+ 

我發現在R(Create a "sessionID" based on "userID" and differences in "timeStamp")一個解決方案,而我不能在Spark中找出它。

感謝您對此問題的任何建議。

+0

的可能的複製[如何在Spark數據幀添加一列?(http://stackoverflow.com/questions/ 32788322 /如何添加列火花數據框) –

+0

這不是一個重複的問題作爲鏈接的問題。鏈接的問題顯示了「如何在DataFrame中添加新列」,而我需要的是「如何在DataFrame中計算新列值(如sessionId) – Torrence

回答

0

的 「如何創建一個新的欄目」 肖恩的回答問候,而我的目標是「如何根據時間戳創建sessionId列」。經過幾天的努力,Window函數在這種情況下被用作一個簡單的解決方案。因爲火花1.4

窗口被引入,它提供的功能,需要這樣的操作時:

兩者上的一組行的操作,同時仍然爲每一輸入行

在返回單個值爲了創建基於時間戳的sessionId,首先我需要獲得用戶A的兩個直接操作之間的區別。 windowDef定義的Window將由「userId」分區,並按時間戳排序,然後diff是一個列,它將爲每行返回一個值,它的值將是分區(組)中當前行之後的1行,或者null如果當前行是最後一行在此分區

def handleDiff(timeOut: Int) = { 
    udf {(timeDiff: Int, timestamp: Int) => if(timeDiff > timeOut) timestamp + ";" else timestamp + ""} 
} 
val windowDef = Window.partitionBy("userId").orderBy("timestamp") 
val diff: Column = lead(eventSequence("timestamp"), 1).over(windowDef) 
val dfTSDiff = eventSequence. 
withColumn("time_diff", diff - eventSequence("timestamp")). 
withColumn("event_seq", handleDiff(TIME_OUT)(col("time_diff"), col("timestamp"))). 
groupBy("userId").agg(GroupConcat(col("event_seq")).alias("event_seqs")) 

更新: 然後利用窗口函數應用(在大熊貓提供)「cumsum」般的操作:

// Define a Window, partitioned by userId (partitionBy), ordered by timestamp (orderBy), and delivers all rows before current row in this partition as frame (rowsBetween) 
val windowSpec = Window.partitionBy("userId").orderBy("timestamp").rowsBetween(Long.MinValue, 0) 
val sessionDf = dfTSDiff. 
    withColumn("ts_diff_flag", genTSFlag(TIME_OUT)(col("time_diff"))). 
    select(col("userId"), col("eventSeq"), col("timestamp"), sum("ts_diff_flag").over(windowSpec).alias("sessionInteger")). 
    withColumn("sessionId", genSessionId(col("userId"), col("sessionInteger"))) 

此前: 然後按「;」分割。並獲得每個會話,創建一個sessionId;之後按「,」分解並爆炸至最終結果。因此sessionId是在字符串操作的幫助下創建的。 (這部分應該用累計和運算代替,但是我沒有找到一個好的解決方案)

歡迎任何關於這個問題的想法或想法。


GroupConcat可以在這裏找到:SPARK SQL replacement for mysql GROUP_CONCAT aggregate function

參考:databricks introduction

-1

dt.withColumn( '的sessionId',expression for the new column sessionId
例如:
dt.timestamp +預先定義的值TIMEOUT