0

我有一個我想在PySpark 2.0中執行的操作,它很容易作爲df.rdd.map執行,但是由於我寧願留在Dataframe執行中引擎出於性能原因,我想找到一種只使用Dataframe操作的方法。在Spark Dataframe列中使用數據作爲條件或在另一個列表達式中輸入

操作,在RDD式的,是這樣的:

def precision_formatter(row): 
    formatter = "%.{}f".format(row.precision) 
    return row + [formatter % row.amount_raw/10 ** row.precision] 
df = df.rdd.map(precision_formatter) 

基本上,我有一列,告訴我,對於每一行,對我的字符串的精度格式化操作應該是什麼,我想根據精度選擇性地將'amount_raw'列格式化爲字符串。

回答

0

我不知道如何使用一列或多列的內容作爲另一列操作的輸入。我可以最接近的是建議使用Column.when和一組外部定義的布爾操作,這些布爾操作對應於一列或多列內可能的布爾條件/情況。在此特定情況下,例如,如果您可以獲得(或更好的已有)row.precision的所有可能值,那麼您可以遍歷該集並對該集中的每個值應用Column.when操作。我相信這套可以通過df.select('precision').distinct().collect()獲得。

因爲pyspark.sql.functions.whenColumn.when操作本身返回Column對象,你可以通過在集中的項目(但它獲得)迭代,並保持「追加」 when操作彼此編程,直到你已用盡設定:

import pyspark.sql.functions as PSF 

def format_amounts_with_precision(df, all_precisions_set): 
    amt_col = PSF.when(df['precision'] == 0, df['amount_raw'].cast(StringType())) 
    for precision in all_precisions_set: 
     if precision != 0: # this is a messy way of having a base case above 
      fmt_str = '%.{}f'.format(precision) 
      amt_col = amt_col.when(df['precision'] == precision, 
          PSF.format_string(fmt_str, df['amount_raw']/10 ** precision) 

    return df.withColumn('amount', amt_col) 
0

你可以用python UDF來做到這一點。它們可以獲取多個輸入值(來自一行的列的值)並吐出單個輸出值。這將是這個樣子:

from pyspark.sql import types as T, functions as F 
from pyspark.sql.function import udf, col 

# Create example data frame 
schema = T.StructType([ 
    T.StructField('precision', T.IntegerType(), False), 
    T.StructField('value', T.FloatType(), False) 
]) 

data = [ 
    (1, 0.123456), 
    (2, 0.123456), 
    (3, 0.123456) 
] 

rdd = sc.parallelize(data) 
df = sqlContext.createDataFrame(rdd, schema) 

# Define UDF and apply it 
def format_func(precision, value): 
    format_str = "{:." + str(precision) + "f}" 
    return format_str.format(value) 

format_udf = F.udf(format_func, T.StringType()) 

new_df = df.withColumn('formatted', format_udf('precision', 'value')) 
new_df.show() 

而且,如果不是列精度值你想使用一個全球性的,當你這樣稱呼它,你可以使用亮(..)功能:

new_df = df.withColumn('formatted', format_udf(F.lit(2), 'value')) 
相關問題