2017-07-02 140 views
0

我有以下代碼中使用:火花RDD.map火花數據幀withColumn方法

from pyspark import *; 
from pyspark.sql import *; 
from pyspark.sql.functions import udf 
from pyspark.sql.types import StringType, StructType, StructField, IntegerType, DoubleType 
import math; 

sc = SparkContext.getOrCreate(); 
spark = SparkSession.builder.master('local').getOrCreate(); 


schema = StructType([ 
    StructField("INDEX", IntegerType(), True), 
    StructField("SYMBOL", StringType(), True), 
    StructField("DATETIMETS", StringType(), True), 
    StructField("PRICE", DoubleType(), True), 
    StructField("SIZE", IntegerType(), True), 
]) 

df = spark\ 
    .createDataFrame(
     data=[(0,'A','2002-12-02 9:30:20',19.75,30200), 
      (1,'A','2002-12-02 9:31:20',19.75,30200),    
      (8,'A','2004-12-02 10:36:20',1.0,30200), 
      (9,'A','2006-12-02 22:41:20',20.0,30200), 
      (10,'A','2006-12-02 22:42:20',40.0,30200)], 
     schema=schema); 

我然後做一些計算,而不使用火花。 這工作正常。

def without_spark(price):  
    first_summation = sum(map(lambda n: math.sqrt(price), range(1,10))); 
    return first_summation; 

u_without_spark = udf(without_spark, DoubleType()) 

df.withColumn("NEW_COL", u_without_spark('PRICE')).show() 

No Spark Output

但是使用RDD並行下面的代碼沒有。

def with_spark(price):  
    rdd = sc.parallelize(1, 10) 
    first_summation = rdd.map(lambda n: math.sqrt(price)); 
    return first_summation.sum(); 

u_with_spark = udf(with_spark, DoubleType()) 

df.withColumn("NEW_COL", u_with_spark('PRICE')).show() 

Spark Error

是什麼,我試圖做不可能的? 有沒有更快的方法來做到這一點?

感謝您的幫助

回答

0

你不能調用任何RDD方法從UDF內。

當您創建一個UDF時,它將在工作人員上運行。 RDD或數據幀操作只能在驅動程序上運行,因此在UDF中不允許。

看起來好像你的目標是做一個UDAF(用戶定義的聚合方法)。這不能從pyspark完成。你有兩個選擇。可以使用collect_list,然後在生成的數組上執行UDF,或者在scala中寫入UDAF並將其包裝爲pyspark。

0

我然後做一些計算,而不使用火花

當你創建dataframe,您使用SparkSession,所以你已經在使用火花udfwithColumn火花數據幀的apis,用於轉換dataframe

Dataframes在本質上是分佈的,即在工作者節點上完成在dataframes上的所有轉換。所以udf使用withColumntransformation都是在工作節點上完成的。您在驅動程序節點中創建了不能用於​​轉換的sparkContextsc)。

是我試圖做不到的?有沒有更快的方法來做到這一點?

你的第二個實施是錯了,因爲你正試圖從轉型中訪問sparkContext

你的第一個方法似乎工作正常,並已使用火花。所以我猜你不需要尋找替代品。