2016-07-06 19 views
4

我的原始數據以表格格式顯示。它包含來自不同變量的觀測值。每個觀察都帶有變量名稱,時間戳和當時的值。PySpark:檢索數據幀內組的平均值和數值的計數

變量[字符串],時間[日期時間],值[浮子]

的數據被存儲作爲鑲木在HDFS並裝入火花數據幀(DF)。從那個數據幀。

現在我想計算每個變量的默認統計量,如平均值,標準偏差等。之後,一旦Mean被檢索出來,我想過濾/計算那些非常接近Mean的變量值。

因此我需要首先得到每個變量的均值。這就是爲什麼我使用GroupBy來獲取每個變量的統計信息(而不是整個數據集)。

df_stats = df.groupBy(df.Variable).agg(\ 
    count(df.Variable).alias("count"), \ 
    mean(df.Value).alias("mean"), \ 
    stddev(df.Value).alias("std_deviation")) 

使用每個變量的均值我可以過濾那些特定變量的均值附近的值(只是計數)。因此我需要該變量的所有觀察值(值)。這些值位於原始數據幀df中,而不在聚合/分組數據幀df_stats中。

creating statistics

最後我想一個數據幀像聚合/分組df_stats一個新列「count_around_mean」

我想使用df_stats.map(...)或df_stats.join(df,df.Variable)。但是我卡上的紅色箭頭:(

提問:你會如何意識到

臨時解決方案:同時我使用的是基於您的想法的解決方案但對於STDDEV範圍2和3的範圍,功能無法正常工作。它總是產生一個

AttributeError的話說零式無_jvm

from pyspark.sql.window import Window 
from pyspark.sql.functions import * 
from pyspark.sql.types import * 

w1 = Window().partitionBy("Variable") 
w2 = Window.partitionBy("Variable").orderBy("Time") 

def stddev_pop_w(col, w): 
    #Built-in stddev doesn't support windowing 
    return sqrt(avg(col * col).over(w) - pow(avg(col).over(w), 2)) 

def isInRange(value, mean, stddev, radius): 
    try: 
     if (abs(value - mean) < radius * stddev): 
      return 1 
     else: 
      return 0 
    except AttributeError: 
     return -1 

delta = col("Time").cast("long") - lag("Time", 1).over(w2).cast("long") 
#f = udf(lambda (value, mean, stddev, radius): abs(value - mean) < radius * stddev, IntegerType()) 
f2 = udf(lambda value, mean, stddev: isInRange(value, mean, stddev, 2), IntegerType()) 
f3 = udf(lambda value, mean, stddev: isInRange(value, mean, stddev, 3), IntegerType()) 

df \ 
    .withColumn("mean", mean("Value").over(w1)) \ 
    .withColumn("std_deviation", stddev_pop_w(col("Value"), w1)) \ 
    .withColumn("delta", delta) 
    .withColumn("stddev_2", f2("Value", "mean", "std_deviation")) \ 
    .withColumn("stddev_3", f3("Value", "mean", "std_deviation")) \ 
    .show(5, False) 

#df2.withColumn("std_dev_3", stddev_range(col("Value"), w1)) \ 
+0

至少對我而言,這並不清楚你在這裏真正想要什麼。你是什​​麼意思_完整的時間系列_?你想要應用什麼樣的功能? – zero323

+0

爲了更好地理解,我重新編寫了原始問題。同時我找到了一個很好的方法,我明天需要測試。然後我會在這裏上傳這個想法。 – Matthias

回答

2

星火2.0+

你可以用一個取代stddev_pop_w內置pyspark.sql.functions.stddev*功能。

星火< 2.0

一般來說,沒有必要與加盟聚集。相反,您可以計算統計數據,而無需使用窗口函數摺疊行。假設你的數據是這樣的:

import numpy as np 
import pandas as pd 
from pyspark.sql.functions import mean 

n = 10000 
k = 20 

np.random.seed(100) 

df = sqlContext.createDataFrame(pd.DataFrame({ 
    "id": np.arange(n), 
    "variable": np.random.choice(k, n), 
    "value": np.random.normal(0, 1, n) 
})) 

您可以通過variable定義與分區窗口:

from pyspark.sql.window import Window 

w = Window().partitionBy("variable") 

和計算統計如下:

from pyspark.sql.functions import avg, pow, sqrt 

def stddev_pop_w(col, w): 
    """Builtin stddev doesn't support windowing 
    You can easily implement sample variant as well 
    """ 
    return sqrt(avg(col * col).over(w) - pow(avg(col).over(w), 2)) 


(df 
    .withColumn("stddev", stddev_pop_w(col("value"), w)) 
    .withColumn("mean", avg("value").over(w)) 
    .show(5, False)) 

## +---+--------------------+--------+------------------+--------------------+ 
## |id |value    |variable|stddev   |mean    | 
## +---+--------------------+--------+------------------+--------------------+ 
## |47 |0.77212446947439 |0  |1.0103781346123295|0.035316745261099715| 
## |60 |-0.931463439483327 |0  |1.0103781346123295|0.035316745261099715| 
## |86 |1.0199074337552294 |0  |1.0103781346123295|0.035316745261099715| 
## |121|-1.619408643898953 |0  |1.0103781346123295|0.035316745261099715| 
## |145|-0.16065930935765935|0  |1.0103781346123295|0.035316745261099715| 
## +---+--------------------+--------+------------------+--------------------+ 
## only showing top 5 rows 

就比較聚集體加入:

from pyspark.sql.functions import stddev, avg, broadcast 

df.join(
    broadcast(df.groupBy("variable").agg(avg("value"), stddev("value"))), 
    ["variable"] 
) 
+1

一如既往的好和聰明的答案。謝謝!明天我會試試。我也在考慮使用Window函數來獲得兩行之間的時間差(滯後),並且還指出q行是否屬於均值附近的合理範圍。 – Matthias

+0

我只是想知道如果調用柱/窗後兩次是有效的?您可以使用groupBy/mapValues/StatCounter來一步獲取這些值。結果表格被壓縮(分組),但可以合併到原始表格中。 – Matthias

+0

您使用列調用的次數不會影響執行計劃。如果你不喜歡窗口函數,你可能更喜歡聚集和加入。關於通過RDD,這意味着多重安全不安全的轉換和繁重的IO + serde。 – zero323