2015-09-07 131 views
8

我有一個包含2列的數據幀:timestamp,值 timestamp是自時間以來的時間,值是浮點值。 我想合併行的平均值分鐘。 這意味着我想要獲取時間戳來自同一輪分鐘(自紀元以來60秒的時間間隔)的所有行,並將它們合併到單個行中,其中值列將是所有值的平均值。將火花數據幀中的多行合併到一行中

舉一個例子,讓我們假設我的數據幀看起來像這樣:

timestamp  value 
---------  ----- 
1441637160  10.0 
1441637170  20.0 
1441637180  30.0 
1441637210  40.0 
1441637220  10.0 
1441637230  0.0 

第一4行是相同的分的一部分(1441637160%60 == 0,1441637160 + 60 == 1441637220) 最後2行是另一分鐘的一部分。 我想合併相同分鐘的所有行。得到如下結果:

timestamp  value 
---------  ----- 
1441637160  25.0 (since (10+20+30+40)/4 = 25) 
1441637220  5.0 (since (10+0)/2 = 5) 

這樣做的最佳方法是什麼?

回答

5

您可以簡單地進行分組和聚合。隨着數據爲:

val df = sc.parallelize(Seq(
    (1441637160, 10.0), 
    (1441637170, 20.0), 
    (1441637180, 30.0), 
    (1441637210, 40.0), 
    (1441637220, 10.0), 
    (1441637230, 0.0))).toDF("timestamp", "value") 

進口所需的功能和類:

import org.apache.spark.sql.functions.{lit, floor} 
import org.apache.spark.sql.types.IntegerType 

創建間隔柱:

val tsGroup = (floor($"timestamp"/lit(60)) * lit(60)) 
    .cast(IntegerType) 
    .alias("timestamp") 

,並用它來進行聚合:

df.groupBy(tsGroup).agg(mean($"value").alias("value")).show 

// +----------+-----+ 
// | timestamp|value| 
// +----------+-----+ 
// |1441637160| 25.0| 
// |1441637220| 5.0| 
// +----------+-----+ 
1

首先將時間戳映射到分鐘桶,然後使用groupByKey計算平均值。例如:

rdd.map(x=>{val round = x._1%60; (x._1-round, x._2);}) 
.groupByKey 
.map(x=>(x._1, (x._2.sum.toDouble/x._2.size))) 
.collect()