我正在閱讀包含數百個CSV文件的公共AWS存儲桶中的所有文本文件。我一次讀取所有的CSV文件,然後將它們轉換爲RDD並開始按摩數據,以便將其存儲在Cassandra中。處理所有的文本文件需要花費兩個半小時,而這對於100GB的數據來說太長了。有什麼我可以做我的代碼在下面,使其更快?使用Dataframes和SQL提高Pyspark的性能
我欣賞任何建議。我也試過閱讀這個https://robertovitillo.com/2015/06/30/spark-best-practices/,但我對如何實現一些提到的內容感到困惑,比如「使用正確的並行性水平」。我也嘗試將rdd.cache存儲在緩存中,但仍需要兩個多小時。
conf = SparkConf() \
.setMaster("spark://%s:%s" % (SPARK_IP, SPARK_PORT))
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
rdd = sc.textFile("s3a://gdelt-open-data/events/*")
def rddCleaning(rd,timeframe):
def check_valid(tup):
try:
int(tup[1])
int(tup[4])
float(tup[5])
float(tup[6])
return True
except ValueError:
return False
def fillin(tup):
if tup[2] == "" and tup[3] != "":
return ((tup[0],tup[1],tup[3],tup[4],tup[5],tup[6]))
else:
return ((tup[0],tup[1],tup[2],tup[4],tup[5],tup[6]))
def popular_avg(curr_tup):
lst = curr_tup[1]
dictionary = curr_tup[2]
dict_matches = {}
for tup in lst:
event_type = tup[0]
dict_matches[event_type] = dictionary[event_type]
return ((curr_tup[0],lst,dict_matches,curr_tup[3]))
def merge_info(tup):
main_dict = tup[1]
info_dict = tup[2]
for key in info_dict:
main_dict[key].update(info_dict[key])
main_dict["TotalArticles"] = {"total":tup[3]}
return ((tup[0],main_dict))
def event_todict(tup):
lst = tup[1]
dict_matches = {}
for event_tup in lst:
dict_matches[event_tup[0]] = {"ArticleMentions":event_tup[1]}
return ((tup[0],dict_matches,tup[2],tup[3]))
def sum_allevents(tup):
type_lst = tup[1]
total_mentions = 0
for event in type_lst:
total_mentions += event[1]
return ((tup[0],type_lst,tup[2],total_mentions))
actionGeo_CountryCode = 51
time = 0
actor1Type1Code = 12
actor2Type1Code = 22
numArticles = 33
goldsteinScale = 30
avgTone = 34
if timeframe == "SQLDATE":
time = 1
elif timeframe == "MonthYear":
time = 2
else:
time = 3
rdd_reduce = rd.map(lambda x: x.split('\t')) \
.map(lambda y: ((y[actionGeo_CountryCode],
y[time],
y[actor1Type1Code],
y[actor2Type1Code],
y[numArticles],
y[goldsteinScale],
y[avgTone]))) \
.filter(check_valid) \
.map(lambda c: ((c[0],int(c[1]),c[2],c[3],int(c[4]),int(float(c[5])),int(float(c[6]))))) \
.map(fillin) \
.filter(lambda r: r[0] in tofullname and r[2] in toevent and r[2] != "" and r[0] != "") \
.map(lambda t: ((tofullname[t[0]],t[1],toevent[t[2]],t[3],t[4],t[5]))) \
.map(lambda f: (((f[0],f[1],f[2]),(f[3],f[4],f[5],1)))) \
.reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1], a[2]+b[2], a[3]+b[3])) \
.map(lambda s: ((s[0],(s[1][0],s[1][1]/s[1][3],s[1][2]/s[1][3]))))
rdd_format = rdd_reduce.map(lambda t:((t[0][0],t[0][1]),
([(t[0][2],t[1][0])],
[(t[0][2],{"GoldsteinScaleAvg":t[1][1],
"ToneAvg":t[1][2]})]))) \
.reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1])) \
.map(lambda v: (v[0],
sorted(v[1][0],key=itemgetter(1),reverse=True),
v[1][1])) \
.map(sum_allevents) \
.map(lambda f: ((f[0],f[1][:5],dict(f[2]),f[3]))) \
.map(popular_avg) \
.map(event_todict) \
.map(merge_info) \
.map(lambda d: ((d[0][0],d[0][1],d[1])))
return rdd_format
daily_rdd = rddCleaning(rdd,"SQLDATE")
print(daily_rdd.take(6));
monthly_rdd = rddCleaning(rdd,"MonthYear")
print(monthly_rdd.take(6));
yearly_rdd = rddCleaning(rdd,"Year")
print(yearly_rdd.take(6));
編輯: 我做了以下修改我的代碼,並提高了性能,但它仍然是採取長時間。是否發生這種情況是因爲每次我調用df時,它都會從S3存儲桶中重新讀取所有文件?我應該把我的一些df和臨時表放入緩存中嗎? 這裏是我的代碼:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql.functions import col
from pyspark.sql.types import StringType, DoubleType, IntegerType
from abbreviations_dict import tofullname, toevent
from operator import itemgetter
import pyspark_cassandra
sc = SparkContext()
sqlContext = SQLContext(sc)
customSchema = schema = StructType([
StructField('GLOBALEVENTID',StringType(),True),
StructField('SQLDATE',StringType(),True),
StructField('MonthYear',StringType(),True),
StructField('Year',StringType(),True),
StructField('FractionDate',StringType(),True),
StructField('Actor1Code',StringType(),True),
StructField('Actor1Name',StringType(),True),
StructField('Actor1CountryCode',StringType(),True),
StructField('Actor1KnownGroupCode',StringType(),True),
StructField('Actor1EthnicCode',StringType(),True),
StructField('Actor1Religion1Code',StringType(),True),
StructField('Actor1Religion2Code',StringType(),True),
StructField('Actor1Type1Code',StringType(),True),
StructField('Actor1Type2Code',StringType(),True),
StructField('Actor1Type3Code',StringType(),True),
StructField('Actor2Code',StringType(),True),
StructField('Actor2Name',StringType(),True),
StructField('Actor2CountryCode',StringType(),True),
StructField('Actor2KnownGroupCode',StringType(),True),
StructField('Actor2EthnicCode',StringType(),True),
StructField('Actor2Religion1Code',StringType(),True),
StructField('Actor2Religion2Code',StringType(),True),
StructField('Actor2Type1Code',StringType(),True),
StructField('Actor2Type2Code',StringType(),True),
StructField('Actor2Type3Code',StringType(),True),
StructField('IsRootEvent',StringType(),True),
StructField('EventCode',StringType(),True),
StructField('EventBaseCode',StringType(),True),
StructField('EventRootCode',StringType(),True),
StructField('QuadClass',StringType(),True),
StructField('GoldsteinScale',StringType(),True),
StructField('NumMentions',StringType(),True),
StructField('NumSources',StringType(),True),
StructField('NumArticles',StringType(),True),
StructField('AvgTone',StringType(),True),
StructField('Actor1Geo_Type',StringType(),True),
StructField('Actor1Geo_FullName',StringType(),True),
StructField('Actor1Geo_CountryCode',StringType(),True),
StructField('Actor1Geo_ADM1Code',StringType(),True),
StructField('Actor1Geo_Lat',StringType(),True),
StructField('Actor1Geo_Long',StringType(),True),
StructField('Actor1Geo_FeatureID',StringType(),True),
StructField('Actor2Geo_Type',StringType(),True),
StructField('Actor2Geo_FullName',StringType(),True),
StructField('Actor2Geo_CountryCode',StringType(),True),
StructField('Actor2Geo_ADM1Code',StringType(),True),
StructField('Actor2Geo_Lat',StringType(),True),
StructField('Actor2Geo_Long',StringType(),True),
StructField('Actor2Geo_FeatureID',StringType(),True),
StructField('ActionGeo_Type',StringType(),True),
StructField('ActionGeo_FullName',StringType(),True),
StructField('ActionGeo_CountryCode',StringType(),True),
StructField('ActionGeo_ADM1Code',StringType(),True),
StructField('ActionGeo_Lat',StringType(),True),
StructField('ActionGeo_Long',StringType(),True),
StructField('ActionGeo_FeatureID',StringType(),True),
StructField('DATEADDED',StringType(),True),
StructField('SOURCEURL',StringType(),True)])
df = sqlContext.read \
.format('com.databricks.spark.csv') \
.options(header='false') \
.options(delimiter="\t") \
.load('s3a://gdelt-open-data/events/*', schema = customSchema)
def modify_values(r,y):
if r == '' and y != '':
return y
else:
return r
def country_exists(r):
if r in tofullname:
return tofullname[r]
else:
return ''
def event_exists(r):
if r in toevent:
return toevent[r]
else:
return ''
modify_val = udf(modify_values, StringType())
c_exists = udf(country_exists,StringType())
e_exists = udf(event_exists,StringType())
dfsub1 = df.withColumn("Actor1Type1Code",modify_val(col("Actor1Type1Code"),col("Actor2Type1Code"))) \
.withColumn("ActionGeo_CountryCode",c_exists(col("ActionGeo_CountryCode"))) \
.withColumn("Actor1Type1Code",e_exists(col("Actor1Type1Code")))
sqlContext.registerDataFrameAsTable(dfsub1, 'temp')
df2 = sqlContext.sql("""SELECT ActionGeo_CountryCode,
SQLDATE, MonthYear, Year,
Actor1Type1Code,
NumArticles,
GoldsteinScale,
AvgTone
FROM temp
WHERE ActionGeo_CountryCode <> ''
AND Actor1Type1Code <> ''
AND NumArticles <> ''
AND GoldsteinScale <> ''
AND AvgTone <> ''""")
sqlContext.registerDataFrameAsTable(df2, 'temp2')
df3 = sqlContext.sql("""SELECT ActionGeo_CountryCode,
CAST(SQLDATE AS INTEGER), CAST(MonthYear AS INTEGER), CAST(Year AS INTEGER),
Actor1Type1Code,
CAST(NumArticles AS INTEGER),
CAST(GoldsteinScale AS INTEGER),
CAST(AvgTone AS INTEGER)
FROM temp2""")
sqlContext.registerDataFrameAsTable(df3, 'temp3')
sqlContext.cacheTable('temp3')
dfdaily = sqlContext.sql("""SELECT ActionGeo_CountryCode,
SQLDATE,
Actor1Type1Code,
SUM(NumArticles) AS NumArticles,
ROUND(AVG(GoldsteinScale),0) AS GoldsteinScale,
ROUND(AVG(AvgTone),0) AS AvgTone
FROM temp3
GROUP BY ActionGeo_CountryCode,
SQLDATE,
Actor1Type1Code""")
dfmonthly = sqlContext.sql("""SELECT ActionGeo_CountryCode,
MonthYear,
Actor1Type1Code,
SUM(NumArticles) AS NumArticles,
ROUND(AVG(GoldsteinScale),0) AS GoldsteinScale,
ROUND(AVG(AvgTone),0) as AvgTone
FROM temp3
GROUP BY ActionGeo_CountryCode,
MonthYear,
Actor1Type1Code""")
dfyearly = sqlContext.sql("""SELECT ActionGeo_CountryCode,
Year,
Actor1Type1Code,
SUM(NumArticles) AS NumArticles,
ROUND(AVG(GoldsteinScale),0) AS GoldsteinScale,
ROUND(AVG(AvgTone),0) as AvgTone
FROM temp3
GROUP BY ActionGeo_CountryCode,
Year,
Actor1Type1Code""")
def rddCleaning(rd,timeframe):
def popular_avg(curr_tup):
lst = curr_tup[1]
dictionary = curr_tup[2]
dict_matches = {}
for tup in lst:
event_type = tup[0]
dict_matches[event_type] = dictionary[event_type]
return ((curr_tup[0],lst,dict_matches,curr_tup[3]))
def merge_info(tup):
main_dict = tup[1]
info_dict = tup[2]
for key in info_dict:
main_dict[key].update(info_dict[key])
main_dict["TotalArticles"] = {"total":tup[3]}
return ((tup[0],main_dict))
def event_todict(tup):
lst = tup[1]
dict_matches = {}
for event_tup in lst:
dict_matches[event_tup[0]] = {"ArticleMentions":event_tup[1]}
return ((tup[0],dict_matches,tup[2],tup[3]))
def sum_allevents(tup):
type_lst = tup[1]
total_mentions = 0
for event in type_lst:
total_mentions += event[1]
return ((tup[0],type_lst,tup[2],total_mentions))
rdd_format = rd.map(lambda y: ((y["ActionGeo_CountryCode"],y[timeframe]),
([(y["Actor1Type1Code"],y["NumArticles"])],
[(y["Actor1Type1Code"],{"Goldstein":y["GoldsteinScale"],"ToneAvg":y["AvgTone"]})]
))) \
.reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1])) \
.map(lambda v: (v[0],
sorted(v[1][0],key=itemgetter(1),reverse=True),
dict(v[1][1]))) \
.map(sum_allevents) \
.map(popular_avg) \
.map(event_todict) \
.map(merge_info) \
.map(lambda d: ((d[0][0],d[0][1],d[1])))
return rdd_format
print("THIS IS THE FIRST ONE ######################################################")
daily_rdd = rddCleaning(dfdaily.rdd,"SQLDATE")
print(daily_rdd.take(5))
print("THIS IS THE SECOND ONE ######################################################")
monthly_rdd = rddCleaning(dfmonthly.rdd,"MonthYear")
print(monthly_rdd.take(5))
print("THIS IS THE THIRD ONE ######################################################")
yearly_rdd = rddCleaning(dfyearly.rdd,"Year")
print(yearly_rdd.take(5))
對於初學者,你會放棄所有的udfs。這些都不是必要的。 – zero323
謝謝你的迴應!對此,我真的非常感激。 我用ufds,因爲我不確定如何在SQL中使用python字典。在兩個udfs中,我將基於存儲在另一個python文件中的字典重新命名行。例如,如果一列中的單元格具有「美國」,那麼我在udf中將其重命名爲「美國」。我可以在SQL中有一個字典變量嗎? – CatherineAlv
您可以使用'join'與'broadcast'或[literal map](http://stackoverflow.com/a/32788650/1560062)。 – zero323