2015-09-11 45 views
0

如果我有一個每分鐘具有卷的RDD,例如將時間序列數據映射到以前的數據點和平均值

(("12:00" -> 124), ("12:01" -> 543), ("12:02" -> 102), ...) 

我想去約映射,要在這一分鐘通氣量,前一分鐘,前5分鐘平均成交量的體積數據集。例如。

(("12:00" -> (124, 300, 245.3)), 
("12:01" -> (543, 124, 230.2)), 
("12:02" -> (102, 543, 287.1))) 

輸入RDD可以是RDD[(DateTime, Int)]和輸出RDD[(DateTime, (Int, Int, Float))]

有什麼好方法可以做到這一點?

+0

你的資料已完成或有可能缺失了一些資料? – zero323

+0

可能存在差距,我會默認爲零。我不介意解決方案是否處理此問題。 –

+0

在純scala中,我將轉換爲DateTime並使用SortedMap。你的數據集有多大? – Reactormonk

回答

3

轉換爲數據幀,並使用窗口函數可以覆蓋滯後,平均和可能的間隙:

import com.github.nscala_time.time.Imports._ 
import org.apache.spark.sql.Row 
import org.apache.spark.sql.functions.{lag, avg, when} 
import org.apache.spark.sql.expressions.Window 

val fmt = DateTimeFormat.forPattern("HH:mm:ss") 

val rdd = sc.parallelize(Seq(
    ("12:00:00" -> 124), ("12:01:00" -> 543), ("12:02:00" -> 102), 
    ("12:30:00" -> 100), ("12:31:00" -> 101) 
).map{case (ds, vol) => (fmt.parseDateTime(ds), vol)}) 

val df = rdd 
    // Convert to millis for window range 
    .map{case (dt, vol) => (dt.getMillis, vol)} 
    .toDF("ts", "volume") 

val w = Window.orderBy($"ts") 

val transformed = df.select(
    $"ts", $"volume", 
    when(
    // Check if we have data from the previous minute 
    (lag($"ts", 1).over(w) - $"ts").equalTo(-60000), 
    // If so get lag otherwise 0 
    lag($"volume", 1).over(w)).otherwise(0).alias("previous_volume"), 
    // Average over window 
    avg($"volume").over(w.rangeBetween(-300000, 0)).alias("average")) 

// Optionally go to back to RDD 
transformed.map{ 
    case Row(ts: Long, volume: Int, previousVolume: Int, average: Double) => 
    (new DateTime(ts) -> (volume, previousVolume, average)) 
} 

要知道,無窗劃分窗口函數是相當低效的。

相關問題