2017-03-01 37 views
1

根據設備,時間戳,打開/關閉格式,我有RDD [String]。如何計算每個設備被打開的時間量。在spark中這樣做的最佳方式是什麼?如何在scala中映射相鄰元素

在裝置1和離裝置0

E.g

A,1335952933,1 
A,1335953754,0 
A,1335994294,1 
A,1335995228,0 
B,1336001513,1 
B,1336002622,0 
B,1336006905,1 
B,1336007462,0 

中間體步驟1

A,((1335953754 - 1335952933),(1335995228 - 1335994294)) 
B,((1336002622- 1336001513),(1336007462 - 1336006905)) 

中間體步驟2

(A,(821,934)) 
(B,(1109,557)) 

輸出

(A,1755) 
(B,1666) 

回答

2

我假設RDD [字符串]可以被解析成DeviceLog的RDD其中DeviceLog是:

case class DeviceLog(val id: String, val timestamp: Long, val onoff: Int) 

的DeviceLog類是非常簡單的。

// initialize contexts 
val sc = new SparkContext(conf) 
val sqlContext = new HiveContext(sc) 

那些初始化我們將用於數據框的spark上下文和sql上下文。

步驟1

val input = List(
    DeviceLog("A",1335952933,1), 
    DeviceLog("A",1335953754,0), 
    DeviceLog("A",1335994294,1), 
    DeviceLog("A",1335995228,0), 
    DeviceLog("B",1336001513,1), 
    DeviceLog("B",1336002622,0), 
    DeviceLog("B",1336006905,1), 
    DeviceLog("B",1336007462,0)) 

val df = input.toDF() 
df.show() 
+---+----------+-----+ 
| id| timestamp|onoff| 
+---+----------+-----+ 
| A|1335952933| 1| 
| A|1335953754| 0| 
| A|1335994294| 1| 
| A|1335995228| 0| 
| B|1336001513| 1| 
| B|1336002622| 0| 
| B|1336006905| 1| 
| B|1336007462| 0| 
+---+----------+-----+ 

步驟2:隔斷由設備ID,以便通過時間戳和保留對信息(開/關)

val wSpec = Window.partitionBy("id").orderBy("timestamp") 

    val df1 = df 
     .withColumn("spend", lag("timestamp", 1).over(wSpec)) 
     .withColumn("one", lag("onoff", 1).over(wSpec)) 
     .where($"spend" isNotNull) 
    df1.show() 

+---+----------+-----+----------+---+ 
| id| timestamp|onoff|  spend|one| 
+---+----------+-----+----------+---+ 
| A|1335953754| 0|1335952933| 1| 
| A|1335994294| 1|1335953754| 0| 
| A|1335995228| 0|1335994294| 1| 
| B|1336002622| 0|1336001513| 1| 
| B|1336006905| 1|1336002622| 0| 
| B|1336007462| 0|1336006905| 1| 
+---+----------+-----+----------+---+ 

步驟3:計算的運行時間和過濾器按標準

val df2 = df1 
     .withColumn("upTime", $"timestamp" - $"spend") 
     .withColumn("criteria", $"one" - $"onoff") 
     .where($"criteria" === 1) 
    df2.show() 

| id| timestamp|onoff|  spend|one|upTime|criteria| 
+---+----------+-----+----------+---+------+--------+ 
| A|1335953754| 0|1335952933| 1| 821|  1| 
| A|1335995228| 0|1335994294| 1| 934|  1| 
| B|1336002622| 0|1336001513| 1| 1109|  1| 
| B|1336007462| 0|1336006905| 1| 557|  1| 
+---+----------+-----+----------+---+------+--------+ 

第4步:按ID和總和

val df3 = df2.groupBy($"id").agg(sum("upTime")) 
    df3.show() 

+---+-----------+ 
| id|sum(upTime)| 
+---+-----------+ 
| A|  1755| 
| B|  1666| 
+---+-----------+ 
+0

謝謝你的回答有沒有什麼辦法可以做到這一點只有rdd操作沒有數據框? –

+0

我認爲它可以完成,但數據框爲這類問題提供了更多的靈活性。另外數據幀在處理方面似乎比rdds更快。 – dumitru