2017-03-22 127 views
2

當使用pyspark時,我希望能夠計算分組值和它們的中位數之間的差異。這可能嗎?這裏是一些我劈開的代碼,除了它計算來自均值的分組差異之外,我還想做什麼。另外,請你彷彿置身有用:)計算pyspark中的分組中位數

from pyspark import SparkContext 
from pyspark.sql import SparkSession 
from pyspark.sql.types import (
    StringType, 
    LongType, 
    DoubleType, 
    StructField, 
    StructType 
) 
from pyspark.sql import functions as F 


sc = SparkContext(appName='myapp') 
spark = SparkSession(sc) 

file_name = 'data.csv' 

fields = [ 
    StructField(
     'group2', 
     LongType(), 
     True), 
    StructField(
     'name', 
     StringType(), 
     True), 
    StructField(
     'value', 
     DoubleType(), 
     True), 
    StructField(
     'group1', 
     LongType(), 
     True) 
] 
schema = StructType(fields) 

df = spark.read.csv(
    file_name, header=False, mode="DROPMALFORMED", schema=schema 
) 
df.show() 
means = df.select([ 
    'group1', 
    'group2', 
    'name', 
    'value']).groupBy([ 
     'group1', 
     'group2' 
    ]).agg(
     F.mean('value').alias('mean_value') 
    ).orderBy('group1', 'group2') 

cond = [df.group1 == means.group1, df.group2 == means.group2] 

means.show() 
df = df.select([ 
    'group1', 
    'group2', 
    'name', 
    'value']).join(
     means, 
     cond 
    ).drop(
     df.group1 
    ).drop(
     df.group2 
    ).select('group1', 
      'group2', 
      'name', 
      'value', 
      'mean_value') 

final = df.withColumn(
    'diff', 
    F.abs(df.value - df.mean_value)) 
final.show() 

sc.stop() 

隨意,我怎麼能做出這樣更好地提出意見,這裏有一個例子集我與玩:

100,name1,0.43,0 
100,name2,0.33,0 
100,name3,0.73,0 
101,name1,0.29,0 
101,name2,0.96,0 
101,name3,0.42,0 
102,name1,0.01,0 
102,name2,0.42,0 
102,name3,0.51,0 
103,name1,0.55,0 
103,name2,0.45,0 
103,name3,0.02,0 
104,name1,0.93,0 
104,name2,0.16,0 
104,name3,0.74,0 
105,name1,0.41,0 
105,name2,0.65,0 
105,name3,0.29,0 
100,name1,0.51,1 
100,name2,0.51,1 
100,name3,0.43,1 
101,name1,0.59,1 
101,name2,0.55,1 
101,name3,0.84,1 
102,name1,0.01,1 
102,name2,0.98,1 
102,name3,0.44,1 
103,name1,0.47,1 
103,name2,0.16,1 
103,name3,0.02,1 
104,name1,0.83,1 
104,name2,0.89,1 
104,name3,0.31,1 
105,name1,0.59,1 
105,name2,0.77,1 
105,name3,0.45,1 

這裏就是我試圖生產:

group1,group2,name,value,median,diff 
0,100,name1,0.43,0.43,0.0 
0,100,name2,0.33,0.43,0.10 
0,100,name3,0.73,0.43,0.30 
0,101,name1,0.29,0.42,0.13 
0,101,name2,0.96,0.42,0.54 
0,101,name3,0.42,0.42,0.0 
0,102,name1,0.01,0.42,0.41 
0,102,name2,0.42,0.42,0.0 
0,102,name3,0.51,0.42,0.09 
0,103,name1,0.55,0.45,0.10 
0,103,name2,0.45,0.45,0.0 
0,103,name3,0.02,0.45,0.43 
0,104,name1,0.93,0.74,0.19 
0,104,name2,0.16,0.74,0.58 
0,104,name3,0.74,0.74,0.0 
0,105,name1,0.41,0.41,0.0 
0,105,name2,0.65,0.41,0.24 
0,105,name3,0.29,0.41,0.24 
1,100,name1,0.51,0.51,0.0 
1,100,name2,0.51,0.51,0.0 
1,100,name3,0.43,0.51,0.08 
1,101,name1,0.59,0.59,0.0 
1,101,name2,0.55,0.59,0.04 
1,101,name3,0.84,0.59,0.25 
1,102,name1,0.01,0.44,0.43 
1,102,name2,0.98,0.44,0.54 
1,102,name3,0.44,0.44,0.0 
1,103,name1,0.47,0.16,0.31 
1,103,name2,0.16,0.16,0.0 
1,103,name3,0.02,0.16,0.14 
1,104,name1,0.83,0.83,0.0 
1,104,name2,0.89,0.83,0.06 
1,104,name3,0.31,0.83,0.52 
1,105,name1,0.59,0.59,0.0 
1,105,name2,0.77,0.59,0.18 
1,105,name3,0.45,0.59,0.14 
+0

我正在嘗試使用'window'函數。然而,我仍然沒有通過創建的'udf'中值函數來實現任務 – titipata

+0

我的理解是做到這一點,你需要一個udaf函數,因爲這將在.agg(...)中實現,但udaf不可用在python中。 – craigching

+0

是的,這是正確的@craigching。我剛剛更新了我的意圖聚合的嘗試。然而,這還不是您要求的正確解決方案。 – titipata

回答

6

可以使用UDF功能median到it.First讓我們創建上面給出簡單的例子,解決它。

# example data 
ls = [[100,'name1',0.43,0], 
     [100,'name2',0.33,0], 
     [100,'name3',0.73,0], 
     [101,'name1',0.29,0], 
     [101,'name2',0.96,0], 
     [...]] 
df = spark.createDataFrame(ls, schema=['a', 'b', 'c', 'd']) 

這裏是計算udf功能位數

# udf for median 
import numpy as np 
import pyspark.sql.functions as func 

def median(values_list): 
    med = np.median(values_list) 
    return float(med) 
udf_median = func.udf(median, FloatType()) 

group_df = df.groupby(['a', 'd']) 
df_grouped = group_df.agg(udf_median(func.collect_list(col('c'))).alias('median')) 
df_grouped.show() 

最後,你可以加入它帶回原df上,以獲得中位數列回來。

df_grouped = df_grouped.withColumnRenamed('a', 'a_').withColumnRenamed('d', 'd_') 
df_final = df.join(df_grouped, [df.a == df_grouped.a_, df.d == df_grouped.d_]).select('a', 'b', 'c', 'median') 
df_final = df_final.withColumn('diff', func.round(func.col('c') - func.col('median'), scale=2)) 

注意,我用round末,以防止多餘的數字是想出位操作之後。

+0

你的udf中位數適合我。你在那裏做什麼有什麼警告? – craigching

+0

@craigching,是的,它的工作原理。它只是不會給你正確的解決方案。你必須加入'('a','b','d')'來完成任務。 – titipata

+1

如果你想清理那個,刪除第一部分,只保留中間部分,我願意標記這個答案,因爲它正是我問的和正在爲我工​​作。儘管我認爲如果你有這個窗口,實現這個窗口會非常簡單。 – craigching