2017-08-30 49 views
1

是否有一種乾淨的方式來計算Spark Dataframe上的移動百分位數。在Spark Dataframe上移動百分位數

我有一個巨大的數據框,我每隔15分鐘彙總一次,我想計算每個部分的百分位數。

df.groupBy(window(col("date").cast("timestamp"), "15 minutes")) 
    .agg(sum("session"),mean("session"),percentile_approx("session", 0.5)) 
    .show() 

錯誤:未發現:價值percentile_approx

所以我要像計算總和與平均值基本的東西,但我需要計算中位數和其他一些百分點。

在Spark 2.1中有這樣做的有效方法嗎?

因爲在這裏,沒有平均percentile_approx,似乎API中實現Percentile_approx功能。

我看到這個問題已經被問到,但答案並不是都同意一個獨特的解決方案。對我來說這很模糊......所以我想知道在2017年8月,是否有一個好的和有效的解決方案。

而當我瀏覽15分鐘的窗口時,我想知道如果僅僅用硬計算它不會工作而不是近似值?

非常感謝您的關注,

祝大家下午好! PS:Scala或PySpark我不介意,兩者都會更大!

+0

示例代碼中的「window」是什麼?你想要窗口函數(和滑動窗口)還是非重疊窗口(groupBy)? –

+0

感謝您的回答,並花時間回答我的問題!我有歷史數據,我想每1分鐘彙總一次。每分鐘,我有數百條記錄,並在每個滑動窗口(每分鐘)我需要計算中位數等...所以我想知道什麼是乾淨的方式來有效地做到這一點 – tricky

+0

好吧,但在這種情況下窗口並不真正「滑動」......因爲對於滑動窗口,您需要窗口函數。AFAIK滑動意味着你的情況:對於每個記錄,採取「周圍」15分鐘的數據並計算聚合 –

回答

1

好了,所以我是非常愚蠢的,我猜。

我不得不callUDF添加到我以前的想法:percentile_approx。對不起,我不同意

callUDF("percentile_approx", col("session"), lit(0.5)) 

因此,例如,在我的情況,我想聚合每分鐘2個月曆史數據集:

df.groupBy(window((col("date")/1000).cast("timestamp"), "1 minutes")) 
.agg(sum("session"),mean("session"),callUDF("percentile_approx", col("session"), lit(0.5))) 
.show() 

(在milisecond時間戳因此/1000

+0

多數民衆贊成在很大程度上,我不知道你可以使用百分比作爲聚合功能! –

+0

只是爲了澄清:percentile_approx(你也可以使用percentile)是一個內置的HIVE UDAF(https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-Built-inAggregateFunctions(UDAF) )),所以它不是由Spark實現的,而是由Hive實現的(如果你有hive-support(或hiveContext),你只能使用它。 –

1

如果你不需要滑動(重疊)窗口,你可以用groupBy來完成。據我所知,沒有百分聚合功能,讓你無論是要實現自己的UDAF或用下面的辦法:

val df = (1 to 100).map(i => (
    i/10, scala.util.Random.nextDouble) 
).toDF("time","session") 

val calcStats = udf((data:Seq[Double]) => { 
    (data.sum, 
    data.sum/data.size, 
    data.sorted.apply(data.size/2) // is ~ median, replace with your desired logic 
) 
}) 

df.groupBy($"time") 
    .agg(collect_list($"session").as("sessions")) 
    .withColumn("stats",calcStats($"sessions").cast("struct<sum:double,mean:double,median:double>")) 
    .select($"time",$"stats.*") 
    .orderBy($"time") 
    .show 

+----+------------------+-------------------+-------------------+ 
|time|    sum|    mean|    median| 
+----+------------------+-------------------+-------------------+ 
| 0|3.5441618790222287| 0.3937957643358032| 0.3968893251191352| 
| 1|3.6612518806543757| 0.3661251880654376| 0.4395039388994335| 
| 2| 4.040992655970037|0.40409926559700365| 0.3522214051715915| 
| 3| 4.583175830988081| 0.4583175830988081| 0.5800394949546751| 
| 4| 3.849409207658501| 0.3849409207658501|0.43422232330495936| 
| 5| 5.514681139649785| 0.5514681139649784| 0.6703416471647694| 
| 6| 4.890227540935781| 0.4890227540935781| 0.5515164635420178| 
| 7|4.1148083531280095|0.41148083531280094| 0.4384132796986667| 
| 8| 5.723834881155167| 0.5723834881155166| 0.6415902834329499| 
| 9| 5.559212938582014| 0.5559212938582014| 0.6816268800227596| 
| 10|0.8867335786067405| 0.8867335786067405| 0.8867335786067405| 
+----+------------------+-------------------+-------------------+ 
+0

感謝您的幫助!你的回答幫助我更好地理解UDF,祝你有美好的一天! – tricky