2017-07-06 30 views
1

我有一個像這樣的數據的數組。通過迭代在scala中的數組組合

tagid,timestamp,listner,orgid,suborgid,rssi 
[4,1496745915,718,4,3,0.30] 
[2,1496745915,3878,4,3,0.20] 
[4,1496745918,362,4,3,0.60] 
[4,1496745913,362,4,3,0.60] 

我想這個循環數組,找到最新的10秒每個標籤& listner.This是我的代碼時間戳數據。

override def inputSchema: StructType = 
StructType(StructField("time", StringType) :: StructField("tagid", StringType) :: StructField("listener", StringType) :: StructField("rssi", StringType) :: Nil) 

    override def initialize(buffer: org.apache.spark.sql.expressions.MutableAggregationBuffer): Unit = { 
buffer(0) = Array[String](); 
} 

override def update(buffer: MutableAggregationBuffer, input: Row): Unit =  { 
buffer(0) = buffer.getAs[WrappedArray[String]](0) :+ (input.getAs[String](0)+";"+ 
    input.getAs[String](1)+";"+input.getAs[String](2)); 
} 

override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit  = { 
buffer1(0) = buffer1.getAs[WrappedArray[String]](0) ++ buffer2.getAs[WrappedArray[String]](0) 
} 

override def evaluate(buffer: Row): Any = { 
val in_array = buffer.getAs[WrappedArray[String]](0); 
} 

的in_array包含所有data.I不知道如何進一步進行。任何幫助將不勝感激。

回答

1

我看到你正在嘗試使用udaf,對於初學者來說,它是一場惡夢。順便說一下,udaf會返回給你每組的一行,並得到所有從原始rows聚合dataframe將是另一個噩夢。

我假設你有數據的文本文件作爲

tagid,timestamp,listner,orgid,suborgid,rssi 
4,1496745915,718,4,3,0.30 
2,1496745915,3878,4,3,0.20 
4,1496745918,362,4,3,0.60 
4,1496745913,362,4,3,0.60 

如果是比讀取該文件以數據幀是很容易,因爲

val df = sqlContext.read.format("csv").option("header", true).load("path to the above file") 
df.show(false) 

這應該給你數據幀作爲

+-----+----------+-------+-----+--------+----+ 
|tagid|timestamp |listner|orgid|suborgid|rssi| 
+-----+----------+-------+-----+--------+----+ 
|4 |1496745915|718 |4 |3  |0.30| 
|2 |1496745915|3878 |4 |3  |0.20| 
|4 |1496745918|362 |4 |3  |0.60| 
|4 |1496745913|362 |4 |3  |0.60| 
+-----+----------+-------+-----+--------+----+ 

現在您只需要篩選最近10秒鐘內的數據每個tagid和listner的時間戳。對於這種使用下面的代碼

val windowSpec = Window 
        .orderBy($"timestamp".desc) //latest to come first 
        .partitionBy("tagid", "listner")//grouping data 

你將不得不把最新的時間戳的每一行上面創建的每個組中,這樣就可以找到時間差。對於做以下

df.withColumn("firstValue", first("timestamp") over windowSpec) 

這將創建一個新的列

+-----+----------+-------+-----+--------+----+----------+ 
|tagid|timestamp |listner|orgid|suborgid|rssi|firstValue| 
+-----+----------+-------+-----+--------+----+----------+ 
|2 |1496745915|3878 |4 |3  |0.20|1496745915| 
|4 |1496745915|718 |4 |3  |0.30|1496745915| 
|4 |1496745918|362 |4 |3  |0.60|1496745918| 
|4 |1496745913|362 |4 |3  |0.60|1496745918| 
+-----+----------+-------+-----+--------+----+----------+ 

下一步將是簡單只是爲了檢查,如果時間差小於10或不和過濾它

df.filter($"firstValue".cast("long")-$"timestamp".cast("long") < 10) 

最後一次刪除不需要的列

df.drop("firstValue") 

我希望答案是清楚和理解

它更清晰,如果你轉換時間戳真實時間戳

+-----+-------------------+-------+-----+--------+----+-------------------+---------+ 
|tagid|timestamp   |listner|orgid|suborgid|rssi|firstValue   |differnce| 
+-----+-------------------+-------+-----+--------+----+-------------------+---------+ 
|2 |2017-06-06 16:30:15|3878 |4 |3  |0.20|2017-06-06 16:30:15|0  | 
|4 |2017-06-06 16:30:15|718 |4 |3  |0.30|2017-06-06 16:30:15|0  | 
|4 |2017-06-06 16:30:18|362 |4 |3  |0.60|2017-06-06 16:30:18|0  | 
|4 |2017-06-06 16:30:13|362 |4 |3  |0.60|2017-06-06 16:30:18|5  | 
+-----+-------------------+-------+-----+--------+----+-------------------+---------+ 
+0

是啊,這問題也問只有我我。我懷疑上面代碼中的窗口是什麼,它會給我帶來錯誤。需要導入任何內容? –

+0

你需要導入org.apache.spark.sql.expressions.Window –

0

首先,您不是遍歷數組。你的「數組」實際上是一個模式,你應該這樣定義你的數據框(即每個元素應該是一個列)。如果您的數據框有字符串數組,你可以創建列使用UDF(見here

接下來,你應該將時間戳轉換爲時間戳類型,因此這將是訂購。

最後,你可以做argmax(見here)中爲每一個列

0

假設這是你的陣列

val arr = Array((4,1499340495,718,4,3,0.30), 
       (2,1496745915,3878,4,3,0.20), 
       (4,1499340495,362,4,3,0.60), 
       (4,1496745913,362,4,3,0.60)) 

java.time.Instant有Java 8

import java.time.instant 

arr.filter(x => (Instant.now.getEpochSecond - x._2) <= 10)