2016-01-25 57 views
1

讓說我有一個數據幀(存儲在斯卡拉VAL爲df),其中包含從CSV數據:火花(斯卡拉)數據幀過濾(FIR)

time,temperature 
0,65 
1,67 
2,62 
3,59 

,我有沒有問題,從文件中讀取該作爲scala語言的火花數據框。

我想補充一個過濾柱(由過濾器我的意思是信號處理的移動平均濾波),(說我想要做(T[n]+T[n-1])/2.0):

time,temperature,temperatureAvg 
0,65,(65+0)/2.0 
1,67,(67+65)/2.0 
2,62,(62+67)/2.0 
3,59,(59+62)/2.0 

(其實,說第一行,我想要32.5而不是(65+0)/2.0。我寫了它來澄清預期的2-time-step過濾操作輸出)

那麼如何實現這個呢?我不熟悉的火花數據幀操作沿柱反覆結合的行...

回答

5

星火2.0+

火花2.0及更高版本,可以使用window功能爲groupBy一個輸入。它允許您指定windowDurationslideDurationstartTime(偏移量)。它只適用於TimestampType列,但不難找到解決方法。你的情況,這將需要一些額外的步驟來糾正界限,而通用的解決方案可以表述爲如下圖所示:

import org.apache.spark.sql.functions.{window, avg} 

df 
    .withColumn("ts", $"time".cast("timestamp")) 
    .groupBy(window($"ts", windowDuration="2 seconds", slideDuration="1 second")) 
    .avg("temperature") 

星火< 2.0

如果分割你的數據,你可以很自然地使用窗口功能如下:

import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions.mean 

val w = Window.partitionBy($"id").orderBy($"time").rowsBetween(-1, 0) 

val df = sc.parallelize(Seq(
    (1L, 0, 65), (1L, 1, 67), (1L, 2, 62), (1L, 3, 59) 
)).toDF("id", "time", "temperature") 

df.select($"*", mean($"temperature").over(w).alias("temperatureAvg")).show 

// +---+----+-----------+--------------+        
// | id|time|temperature|temperatureAvg| 
// +---+----+-----------+--------------+ 
// | 1| 0|   65|   65.0| 
// | 1| 1|   67|   66.0| 
// | 1| 2|   62|   64.5| 
// | 1| 3|   59|   60.5| 
// +---+----+-----------+--------------+ 

可以使用lead/lag功能創建任意砝碼窗口:

lit(0.6) * $"temperature" + 
lit(0.3) * lag($"temperature", 1) + 
lit(0.2) * lag($"temperature", 2) 

不存在partitionBy條款,但仍然是可能的,但效率極低。如果是這種情況,您將無法使用DataFrames。相反,您可以在RDD上使用sliding(請參閱Operate neighbor elements in RDD in Spark)。還有spark-timeseries包可能會發現有用。

+0

謝謝零!它工作得很好。一個相關的問題,如果我想做過濾而不是'0.5T [n] + 0.5T [n-1]'而想要'0.6T [n] + 0.3T [n-1] + 0.1T [n -2]'其中'T [n]'是第n行的溫度? –

+1

而不是「平均」使用「滯後」和「鉛」。 – zero323