我有一個像這樣的數據的數組。通過迭代在scala中的數組組合
tagid,timestamp,listner,orgid,suborgid,rssi
[4,1496745915,718,4,3,0.30]
[2,1496745915,3878,4,3,0.20]
[4,1496745918,362,4,3,0.60]
[4,1496745913,362,4,3,0.60]
我想這個循環數組,找到最新的10秒每個標籤& listner.This是我的代碼時間戳數據。
override def inputSchema: StructType =
StructType(StructField("time", StringType) :: StructField("tagid", StringType) :: StructField("listener", StringType) :: StructField("rssi", StringType) :: Nil)
override def initialize(buffer: org.apache.spark.sql.expressions.MutableAggregationBuffer): Unit = {
buffer(0) = Array[String]();
}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getAs[WrappedArray[String]](0) :+ (input.getAs[String](0)+";"+
input.getAs[String](1)+";"+input.getAs[String](2));
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getAs[WrappedArray[String]](0) ++ buffer2.getAs[WrappedArray[String]](0)
}
override def evaluate(buffer: Row): Any = {
val in_array = buffer.getAs[WrappedArray[String]](0);
}
的in_array包含所有data.I不知道如何進一步進行。任何幫助將不勝感激。
是啊,這問題也問只有我我。我懷疑上面代碼中的窗口是什麼,它會給我帶來錯誤。需要導入任何內容? –
你需要導入org.apache.spark.sql.expressions.Window –