2016-10-13 40 views
0

如何獲取數據幀scala中的日期差異(兩者之間的天數)?SPARK:如何獲取SCALA中的數據幀列和時間戳之間的日差

我有一個DF:[id: string, itemName: string, eventTimeStamp: timestamp]和開始時間(時刻字符串)如何獲取一列 「Daydifference」 - 天之間(開始時間 - 時間戳)

我的代碼:

初始DF:

+------------+-----------+-------------------------+ 
| id  | itemName | eventTimeStamp  | 
---------------------------------------------------- 
| 1  | TV  | 2016-09-19T00:00:00Z | 
| 1  | Movie | 2016-09-19T00:00:00Z | 
| 1  | TV  | 2016-09-26T00:00:00Z | 
| 2  | TV  | 2016-09-18T00:00:00Z | 

我需要根據ID和ITEMNAME獲得最新eventTimeStamp,所以我所做的:

val result = df.groupBy("id", "itemName").agg(max("eventTimeStamp") as "mostRecent") 


    +------------+-----------+-------------------------+ 
    | id  | itemName | mostRecent   | 
    ---------------------------------------------------- 
    | 1  | TV  | 2016-09-26T00:00:00Z | 
    | 1  | Movie | 2016-09-19T00:00:00Z | 
    | 2  | TV  | 2016-09-26T00:00:00Z | 

現在我需要得到mostRecent和開始時間(2016-09-29T00:00:00Z)之間的時間差,這樣我就可以得到:

{ id : 1, {"itemMap" : {"TV" : 3, "Movie" : 10 }} } 
    { id : 2, {"itemMap" : {"TV" : 3}} } 

我想是這樣的:

 val startTime = "2016-09-26T00:00:00Z" 

    val result = df.groupBy("id", "itemName").agg(datediff(startTime, max("eventTimeStamp")) as Daydifference) 


case class Data (itemMap : Map[String, Long]) extends Serializable 


result.map{ 
    case r => 
    val id = r.getAs[String]("id") 
    val itemName = r.getAs[String]("itemName") 
    val Daydifference = r.getAs[Long]("Daydifference") 

    (id, Map(itemName -> Daydifference)) 

}.reduceByKey((x, y) => x ++ y).map{ 
     case (k, v) => 
     (k, JacksonUtil.toJson(Data(v))) 
    } 

但在網站上出現錯誤。有人能告訴我,我該如何實現這一目標?

+0

我只知道pyspark API,但使用UserDefinedFunctions你可以嘗試做類似的工作作爲[這裏](http://stackoverflow.com/questions/30283415/calculating-duration-by-subtracting-two-datetime-columns-in-string-format): –

回答

0

當您想在DataFrame中使用某個常量(「文字」)值作爲Column時,應該使用lit(...)函數。這裏的另一個錯誤是試圖用一個String作爲startDate,把它比作一個時間戳列,您可以使用java.sql.Date

val startTime = new java.sql.Date(2016, 8, 26) // beware, months are Zero-based 

val result = df.groupBy("id", "itemName") 
    .agg(datediff(lit(startTime), max("eventTimeStamp")) as "Daydifference") 

result.show() 
// +---+--------+-------------+ 
// | id|itemName|Daydifference| 
// +---+--------+-------------+ 
// | 1| Movie|   7| 
// | 1|  TV|   0| 
// | 2|  TV|   0| 
// +---+--------+-------------+ 
相關問題