我有以下代碼中使用:火花RDD.map火花數據幀withColumn方法
from pyspark import *;
from pyspark.sql import *;
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, StructType, StructField, IntegerType, DoubleType
import math;
sc = SparkContext.getOrCreate();
spark = SparkSession.builder.master('local').getOrCreate();
schema = StructType([
StructField("INDEX", IntegerType(), True),
StructField("SYMBOL", StringType(), True),
StructField("DATETIMETS", StringType(), True),
StructField("PRICE", DoubleType(), True),
StructField("SIZE", IntegerType(), True),
])
df = spark\
.createDataFrame(
data=[(0,'A','2002-12-02 9:30:20',19.75,30200),
(1,'A','2002-12-02 9:31:20',19.75,30200),
(8,'A','2004-12-02 10:36:20',1.0,30200),
(9,'A','2006-12-02 22:41:20',20.0,30200),
(10,'A','2006-12-02 22:42:20',40.0,30200)],
schema=schema);
我然後做一些計算,而不使用火花。 這工作正常。
def without_spark(price):
first_summation = sum(map(lambda n: math.sqrt(price), range(1,10)));
return first_summation;
u_without_spark = udf(without_spark, DoubleType())
df.withColumn("NEW_COL", u_without_spark('PRICE')).show()
但是使用RDD並行下面的代碼沒有。
def with_spark(price):
rdd = sc.parallelize(1, 10)
first_summation = rdd.map(lambda n: math.sqrt(price));
return first_summation.sum();
u_with_spark = udf(with_spark, DoubleType())
df.withColumn("NEW_COL", u_with_spark('PRICE')).show()
是什麼,我試圖做不可能的? 有沒有更快的方法來做到這一點?
感謝您的幫助