我的原始數據以表格格式顯示。它包含來自不同變量的觀測值。每個觀察都帶有變量名稱,時間戳和當時的值。PySpark:檢索數據幀內組的平均值和數值的計數
變量[字符串],時間[日期時間],值[浮子]
的數據被存儲作爲鑲木在HDFS並裝入火花數據幀(DF)。從那個數據幀。
現在我想計算每個變量的默認統計量,如平均值,標準偏差等。之後,一旦Mean被檢索出來,我想過濾/計算那些非常接近Mean的變量值。
因此我需要首先得到每個變量的均值。這就是爲什麼我使用GroupBy來獲取每個變量的統計信息(而不是整個數據集)。
df_stats = df.groupBy(df.Variable).agg(\
count(df.Variable).alias("count"), \
mean(df.Value).alias("mean"), \
stddev(df.Value).alias("std_deviation"))
使用每個變量的均值我可以過濾那些特定變量的均值附近的值(只是計數)。因此我需要該變量的所有觀察值(值)。這些值位於原始數據幀df中,而不在聚合/分組數據幀df_stats中。
最後我想一個數據幀像聚合/分組df_stats一個新列「count_around_mean」。
我想使用df_stats.map(...)或df_stats.join(df,df.Variable)。但是我卡上的紅色箭頭:(
提問:你會如何意識到
臨時解決方案:同時我使用的是基於您的想法的解決方案但對於STDDEV範圍2和3的範圍,功能無法正常工作。它總是產生一個
AttributeError的話說零式無_jvm
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.types import *
w1 = Window().partitionBy("Variable")
w2 = Window.partitionBy("Variable").orderBy("Time")
def stddev_pop_w(col, w):
#Built-in stddev doesn't support windowing
return sqrt(avg(col * col).over(w) - pow(avg(col).over(w), 2))
def isInRange(value, mean, stddev, radius):
try:
if (abs(value - mean) < radius * stddev):
return 1
else:
return 0
except AttributeError:
return -1
delta = col("Time").cast("long") - lag("Time", 1).over(w2).cast("long")
#f = udf(lambda (value, mean, stddev, radius): abs(value - mean) < radius * stddev, IntegerType())
f2 = udf(lambda value, mean, stddev: isInRange(value, mean, stddev, 2), IntegerType())
f3 = udf(lambda value, mean, stddev: isInRange(value, mean, stddev, 3), IntegerType())
df \
.withColumn("mean", mean("Value").over(w1)) \
.withColumn("std_deviation", stddev_pop_w(col("Value"), w1)) \
.withColumn("delta", delta)
.withColumn("stddev_2", f2("Value", "mean", "std_deviation")) \
.withColumn("stddev_3", f3("Value", "mean", "std_deviation")) \
.show(5, False)
#df2.withColumn("std_dev_3", stddev_range(col("Value"), w1)) \
至少對我而言,這並不清楚你在這裏真正想要什麼。你是什麼意思_完整的時間系列_?你想要應用什麼樣的功能? – zero323
爲了更好地理解,我重新編寫了原始問題。同時我找到了一個很好的方法,我明天需要測試。然後我會在這裏上傳這個想法。 – Matthias