2017-07-11 55 views
1

我需要logic.I幫助範圍有限階有這樣迭代與記錄

tag,timestamp,listner,org,suborg,rssi        
4,101,1901,4,3,0.60                                          
4,110,1901,4,3,0.90 
4,104,1901,4,3,0.30 
4,109,1901,4,3,0.40 
4,111,1901,4,3,0.60               
4,128,1901,4,3,0.40 
4,129,1901,4,3,0.80 
4,131,1901,4,3,0.60                 
4,133,1901,4,3,0.30 
4,143,1901,4,3,0.60                 
4,147,1901,4,3,0.70 
4,148,1901,4,3,0.40 
4,149,1901,4,3,0.30 
4,150,1901,4,3,0.90 

我必須找到RSSI列的平均它們距離最近10秒時間表數據。

這是我的預期輸出。

tagShortID,timestamp,listenerShortID,rootOrgID,subOrgID,rssi_Weight,rssi_Weight_avg 
4,150,1901,4,3,0.9,0.58 
4,149,1901,4,3,0.3,0.5 
4,148,1901,4,3,0.4,0.56 
4,147,1901,4,3,0.7,0.64 
4,143,1901,4,3,0.6,0.44 
4,133,1901,4,3,0.3,0.525 
4,131,1901,4,3,0.6,0.6 
4,129,1901,4,3,0.8,0.6 
4,128,1901,4,3,0.4,0.4 
4,111,1901,4,3,0.6,0.6 
4,110,1901,4,3,0.9,0.9 
4,109,1901,4,3,0.4,0.4 
4,104,1901,4,3,0.3,0.3 
4,101,1901,4,3,0.6,0.6 

我想這

df.withColumn("firstValue", first("Timestamp") over Window.orderBy($"Timestamp".desc).partitionBy("tagShortID", "ListenerShortID")) 
.filter($"firstValue".cast("long")-$"Timestamp".cast("long") <= 10) 
.withColumn("count", count("Timestamp") over Window.partitionBy("tagShortID", "ListenerShortID")) 
.withColumn("RSSI_Weight", when($"count" >= 10, avg($"RSSI_Weight") over Window.orderBy("Timestamp").partitionBy("tagShortID", "ListenerShortID").rowsBetween(Long.MinValue, 0)) otherwise($"RSSI_Weight")) 
.drop("firstValue", "count") 
.show(30, false) 

這上面會檢查是否有高價值的時間戳,然後做 - 10秒。但我需要迭代每個時間戳並檢查10秒。如果是,則avg else採用rssi值。

任何幫助將不勝感激。

回答

1

你可以用下面的邏輯在RDD得到數據框,你需要

def avgCalc(buffer: Iterable[Array[String]], list: Array[String]) = { 
    val currentTimeStamp = list(1).toLong 
    var sum = 0.0 
    var count = 0 
    var check = false 
    import scala.util.control.Breaks._ 
    breakable { 
    for (array <- buffer) { 
     val toCheckTimeStamp = array(1).toLong 
     if (((currentTimeStamp - 10L) <= toCheckTimeStamp) && (currentTimeStamp >= toCheckTimeStamp)) { 
     sum += array(5).toDouble 
     count += 1 
     } 
     if ((currentTimeStamp - 10L) > toCheckTimeStamp) { 
     check = true 
     break 
     } 
    } 
    } 
    if (sum != 0.0 && check) list :+ sum/count 
    else list :+ list(5).toDouble 
} 

import sqlContext.implicits._ 
val averageDF = sc.textFile("path to your csv file") 
    .map(line => line.split(",").map(_.trim)) 
    .sortBy(array => array(1), false) 
    .groupBy(array => (array(0), array(2))) 
    .mapValues(buffer => { 
     buffer.map(list => { 
     avgCalc(buffer, list) 
     }) 
    }) 
    .flatMap(x => x._2) 
    .map(x => Jessi(x(0).toString, x(1).toString.toLong, x(2).toString, x(3).toString, x(4).toString, x(5).toString.toDouble, x(6).toString.toDouble)) 
    .toDF 

averageDF.show 

凡傑西是一個案例類

case class Jessi(tagShortID: String, Timestamp: Long, ListenerShortID: String, rootOrgID: String, subOrgID: String, RSSI_Weight: Double, RSSI_Weight_avg: Double) 

所以你應該有如下輸出

+----------+---------+---------------+---------+--------+-----------+-------------------+ 
|tagShortID|Timestamp|ListenerShortID|rootOrgID|subOrgID|RSSI_Weight|RSSI_Weight_avg | 
+----------+---------+---------------+---------+--------+-----------+-------------------+ 
|4   |150  |1901   |4  |3  |0.9  |0.58    | 
|4   |149  |1901   |4  |3  |0.3  |0.5    | 
|4   |148  |1901   |4  |3  |0.4  |0.5666666666666668 | 
|4   |147  |1901   |4  |3  |0.7  |0.6499999999999999 | 
|4   |143  |1901   |4  |3  |0.6  |0.44999999999999996| 
|4   |133  |1901   |4  |3  |0.3  |0.525    | 
|4   |131  |1901   |4  |3  |0.6  |0.6    | 
|4   |129  |1901   |4  |3  |0.8  |0.6000000000000001 | 
|4   |128  |1901   |4  |3  |0.4  |0.4    | 
|4   |111  |1901   |4  |3  |0.6  |0.6    | 
|4   |110  |1901   |4  |3  |0.9  |0.9    | 
|4   |109  |1901   |4  |3  |0.4  |0.4    | 
|4   |104  |1901   |4  |3  |0.3  |0.3    | 
|4   |101  |1901   |4  |3  |0.6  |0.6    | 
+----------+---------+---------------+---------+--------+-----------+-------------------+ 
+0

大解決方案 –

+0

如果有幫助,請接受答案:) –