2016-11-29 49 views
4

我想計算每行的列子集的最大值,並將其作爲現有Dataframe的新列添加。PySpark:計算列子集的最大行數並添加到現有數據框

我能夠做到這一點非常尷尬的方式:

def add_colmax(df,subset_columns,colnm): 
    ''' 
    calculate the maximum of the selected "subset_columns" from dataframe df for each row, 
    new column containing row wise maximum is added to dataframe df. 

    df: dataframe. It must contain subset_columns as subset of columns 
    colnm: Name of the new column containing row-wise maximum of subset_columns 
    subset_columns: the subset of columns from w 
    ''' 
    from pyspark.sql.functions import monotonicallyIncreasingId 
    from pyspark.sql import Row 
    def get_max_row_with_None(row): 
     return float(np.max(row)) 

    df_subset = df.select(subset_columns) 
    rdd = df_subset.map(get_max_row_with_None) 
    df_rowsum = rdd.map(Row(colnm)).toDF() 
    df_rowsum = df_rowsum.withColumn("id",monotonicallyIncreasingId()) 
    df = df.withColumn("id",monotonicallyIncreasingId()) 
    df = df.join(df_rowsum,df.id == df_rowsum.id).drop(df.id).drop(df_rowsum.id) 
    return df 

此功能的作用:

rdd1 = sc.parallelize([("foo", 1.0,3.0,None), 
        ("bar", 2.0,2.0,-10), 
        ("baz", 3.3,1.2,10.0)]) 


df1 = sqlContext.createDataFrame(rdd1, ('v1', 'v2','v3','v4')) 
df_new = add_colmax(df1,['v2','v3','v4'],"rowsum") 
df_new.collect() 

回報:

[Row(v1=u'bar', v2=2.0, v3=2.0, v4=-10, rowsum=2.0), 
    Row(v1=u'baz', v2=3.3, v3=1.2, v4=None, rowsum=3.3), 
    Row(v1=u'foo', v2=1.0, v3=3.0, v4=None, rowsum=3.0)] 

我想,如果我可以使用用戶用withColumn定義函數,這可以做得簡單得多。但我無法弄清楚如何去做。 請讓我知道,如果你有更簡單的方法來實現這一點。 我使用的火花1.6

回答

7

讓我們先從一對夫婦的進口

from pyspark.sql.functions import col, lit, coalesce, greatest 

下一個定義負無窮大字面的:

minf = lit(float("-inf")) 

地圖列並將結果傳遞到greatest

rowmax = greatest(*[coalesce(col(x), minf) for x in ['v2','v3','v4']]) 

終於withColumn

df1.withColumn("rowmax", rowmax) 

與結果:

+---+---+---+----+------+ 
| v1| v2| v3| v4|rowmax| 
+---+---+---+----+------+ 
|foo|1.0|3.0|null| 3.0| 
|bar|2.0|2.0| -10| 2.0| 
|baz|3.3|1.2|null| 3.3| 
+---+---+---+----+------+ 

您可以使用不同的行明智的操作與中性元素替換minf相同的模式。例如:

rowsum = sum([coalesce(col(x), lit(0)) for x in ['v2','v3','v4']]) 

或:

from operator import mul 
from functools import reduce 

rowproduct = reduce(
    mul, 
    [coalesce(col(x), lit(1)) for x in ['v2','v3','v4']] 
) 

你自己的代碼可以與udf可以顯著簡化:

from pyspark.sql.types import DoubleType 
from pyspark.sql.functions import udf 

def get_max_row_with_None_(*cols): 
    return float(max(x for x in cols if x is not None)) 

get_max_row_with_None = udf(get_max_row_with_None_, DoubleType()) 
df1.withColumn("rowmax", get_max_row_with_None('v2','v3','v4')) 

更換minflit(float("inf"))greatestleast得到每最小值行。

相關問題