2017-02-13 52 views
1

我正在閱讀包含數百個CSV文件的公共AWS存儲桶中的所有文本文件。我一次讀取所有的CSV文件,然後將它們轉換爲RDD並開始按摩數據,以便將其存儲在Cassandra中。處理所有的文本文件需要花費兩個半小時,而這對於100GB的數據來說太長了。有什麼我可以做我的代碼在下面,使其更快?使用Dataframes和SQL提高Pyspark的性能

我欣賞任何建議。我也試過閱讀這個https://robertovitillo.com/2015/06/30/spark-best-practices/,但我對如何實現一些提到的內容感到困惑,比如「使用正確的並行性水平」。我也嘗試將rdd.cache存儲在緩存中,但仍需要兩個多小時。

conf = SparkConf() \ 
    .setMaster("spark://%s:%s" % (SPARK_IP, SPARK_PORT)) 

sc = SparkContext(conf=conf) 
sqlContext = SQLContext(sc) 
rdd = sc.textFile("s3a://gdelt-open-data/events/*") 

def rddCleaning(rd,timeframe): 

def check_valid(tup): 
    try: 
     int(tup[1]) 
     int(tup[4]) 
     float(tup[5]) 
     float(tup[6]) 
     return True 
    except ValueError: 
     return False 


def fillin(tup): 
    if tup[2] == "" and tup[3] != "": 
     return ((tup[0],tup[1],tup[3],tup[4],tup[5],tup[6])) 
    else: 
     return ((tup[0],tup[1],tup[2],tup[4],tup[5],tup[6])) 

def popular_avg(curr_tup): 
    lst = curr_tup[1] 
    dictionary = curr_tup[2] 
    dict_matches = {} 
    for tup in lst: 
    event_type = tup[0] 
     dict_matches[event_type] = dictionary[event_type] 
    return ((curr_tup[0],lst,dict_matches,curr_tup[3])) 

def merge_info(tup): 
    main_dict = tup[1] 
    info_dict = tup[2] 
    for key in info_dict: 
     main_dict[key].update(info_dict[key]) 
    main_dict["TotalArticles"] = {"total":tup[3]} 
    return ((tup[0],main_dict)) 

def event_todict(tup): 
    lst = tup[1] 
    dict_matches = {} 
    for event_tup in lst: 
     dict_matches[event_tup[0]] = {"ArticleMentions":event_tup[1]} 
    return ((tup[0],dict_matches,tup[2],tup[3])) 

def sum_allevents(tup): 
    type_lst = tup[1] 
    total_mentions = 0 
    for event in type_lst: 
      total_mentions += event[1] 
    return ((tup[0],type_lst,tup[2],total_mentions)) 

actionGeo_CountryCode = 51 
time = 0 
actor1Type1Code = 12 
actor2Type1Code = 22 
numArticles = 33 
goldsteinScale = 30 
avgTone = 34 

if timeframe == "SQLDATE": 
time = 1 
elif timeframe == "MonthYear": 
time = 2 
else: 
time = 3 




rdd_reduce = rd.map(lambda x: x.split('\t')) \ 
     .map(lambda y: ((y[actionGeo_CountryCode], 
           y[time], 
           y[actor1Type1Code], 
           y[actor2Type1Code], 
           y[numArticles], 
           y[goldsteinScale], 
           y[avgTone]))) \ 
     .filter(check_valid) \ 
     .map(lambda c: ((c[0],int(c[1]),c[2],c[3],int(c[4]),int(float(c[5])),int(float(c[6]))))) \ 
     .map(fillin) \ 
       .filter(lambda r: r[0] in tofullname and r[2] in toevent and r[2] != "" and r[0] != "") \ 
       .map(lambda t: ((tofullname[t[0]],t[1],toevent[t[2]],t[3],t[4],t[5]))) \ 
       .map(lambda f: (((f[0],f[1],f[2]),(f[3],f[4],f[5],1)))) \ 
     .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1], a[2]+b[2], a[3]+b[3])) \ 
     .map(lambda s: ((s[0],(s[1][0],s[1][1]/s[1][3],s[1][2]/s[1][3])))) 


rdd_format = rdd_reduce.map(lambda t:((t[0][0],t[0][1]), 
             ([(t[0][2],t[1][0])], 
             [(t[0][2],{"GoldsteinScaleAvg":t[1][1], 
               "ToneAvg":t[1][2]})]))) \ 
      .reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1])) \ 
      .map(lambda v: (v[0], 
             sorted(v[1][0],key=itemgetter(1),reverse=True), 
             v[1][1])) \ 
       .map(sum_allevents) \ 
         .map(lambda f: ((f[0],f[1][:5],dict(f[2]),f[3]))) \ 
         .map(popular_avg) \ 
       .map(event_todict) \ 
         .map(merge_info) \ 
      .map(lambda d: ((d[0][0],d[0][1],d[1]))) 

return rdd_format 




daily_rdd = rddCleaning(rdd,"SQLDATE") 
print(daily_rdd.take(6)); 
monthly_rdd = rddCleaning(rdd,"MonthYear") 
print(monthly_rdd.take(6)); 
yearly_rdd = rddCleaning(rdd,"Year") 
print(yearly_rdd.take(6)); 

這裏是我的pyspark跑步的照片:建議後作出 enter image description here

編輯: 我做了以下修改我的代碼,並提高了性能,但它仍然是採取長時間。是否發生這種情況是因爲每次我調用df時,它都會從S3存儲桶中重新讀取所有文件?我應該把我的一些df和臨時表放入緩存中嗎? 這裏是我的代碼:

from pyspark import SparkContext, SparkConf 
from pyspark.sql import SQLContext 
from pyspark.sql.types import * 
from pyspark.sql.functions import udf 
from pyspark.sql.functions import col 
from pyspark.sql.types import StringType, DoubleType, IntegerType 
from abbreviations_dict import tofullname, toevent 
from operator import itemgetter 
import pyspark_cassandra 

sc = SparkContext() 
sqlContext = SQLContext(sc) 

customSchema = schema = StructType([ 
     StructField('GLOBALEVENTID',StringType(),True), 
     StructField('SQLDATE',StringType(),True), 
     StructField('MonthYear',StringType(),True), 
     StructField('Year',StringType(),True), 
     StructField('FractionDate',StringType(),True), 
     StructField('Actor1Code',StringType(),True), 
     StructField('Actor1Name',StringType(),True), 
     StructField('Actor1CountryCode',StringType(),True), 
     StructField('Actor1KnownGroupCode',StringType(),True), 
     StructField('Actor1EthnicCode',StringType(),True), 
     StructField('Actor1Religion1Code',StringType(),True), 
     StructField('Actor1Religion2Code',StringType(),True), 
     StructField('Actor1Type1Code',StringType(),True), 
     StructField('Actor1Type2Code',StringType(),True), 
     StructField('Actor1Type3Code',StringType(),True), 
     StructField('Actor2Code',StringType(),True), 
     StructField('Actor2Name',StringType(),True), 
     StructField('Actor2CountryCode',StringType(),True), 
     StructField('Actor2KnownGroupCode',StringType(),True), 
     StructField('Actor2EthnicCode',StringType(),True), 
     StructField('Actor2Religion1Code',StringType(),True), 
     StructField('Actor2Religion2Code',StringType(),True), 
     StructField('Actor2Type1Code',StringType(),True), 
     StructField('Actor2Type2Code',StringType(),True), 
     StructField('Actor2Type3Code',StringType(),True), 
     StructField('IsRootEvent',StringType(),True), 
     StructField('EventCode',StringType(),True), 
     StructField('EventBaseCode',StringType(),True), 
     StructField('EventRootCode',StringType(),True), 
     StructField('QuadClass',StringType(),True), 
     StructField('GoldsteinScale',StringType(),True), 
     StructField('NumMentions',StringType(),True), 
     StructField('NumSources',StringType(),True), 
     StructField('NumArticles',StringType(),True), 
     StructField('AvgTone',StringType(),True), 
     StructField('Actor1Geo_Type',StringType(),True), 
     StructField('Actor1Geo_FullName',StringType(),True), 
     StructField('Actor1Geo_CountryCode',StringType(),True), 
     StructField('Actor1Geo_ADM1Code',StringType(),True), 
     StructField('Actor1Geo_Lat',StringType(),True), 
     StructField('Actor1Geo_Long',StringType(),True), 
     StructField('Actor1Geo_FeatureID',StringType(),True), 
     StructField('Actor2Geo_Type',StringType(),True), 
     StructField('Actor2Geo_FullName',StringType(),True), 
     StructField('Actor2Geo_CountryCode',StringType(),True), 
     StructField('Actor2Geo_ADM1Code',StringType(),True), 
     StructField('Actor2Geo_Lat',StringType(),True), 
     StructField('Actor2Geo_Long',StringType(),True), 
     StructField('Actor2Geo_FeatureID',StringType(),True), 
     StructField('ActionGeo_Type',StringType(),True), 
     StructField('ActionGeo_FullName',StringType(),True), 
     StructField('ActionGeo_CountryCode',StringType(),True), 
     StructField('ActionGeo_ADM1Code',StringType(),True), 
     StructField('ActionGeo_Lat',StringType(),True), 
     StructField('ActionGeo_Long',StringType(),True), 
     StructField('ActionGeo_FeatureID',StringType(),True), 
     StructField('DATEADDED',StringType(),True), 
     StructField('SOURCEURL',StringType(),True)]) 

df = sqlContext.read \ 
    .format('com.databricks.spark.csv') \ 
    .options(header='false') \ 
    .options(delimiter="\t") \ 
    .load('s3a://gdelt-open-data/events/*', schema = customSchema) 

def modify_values(r,y): 
    if r == '' and y != '': 
    return y 
    else: 
     return r 

def country_exists(r): 
    if r in tofullname: 
     return tofullname[r] 
    else: 
    return '' 

def event_exists(r): 
    if r in toevent: 
    return toevent[r] 
    else: 
    return '' 


modify_val = udf(modify_values, StringType()) 
c_exists = udf(country_exists,StringType()) 
e_exists = udf(event_exists,StringType()) 
dfsub1 = df.withColumn("Actor1Type1Code",modify_val(col("Actor1Type1Code"),col("Actor2Type1Code"))) \ 
      .withColumn("ActionGeo_CountryCode",c_exists(col("ActionGeo_CountryCode"))) \ 
      .withColumn("Actor1Type1Code",e_exists(col("Actor1Type1Code"))) 

sqlContext.registerDataFrameAsTable(dfsub1, 'temp') 
df2 = sqlContext.sql("""SELECT ActionGeo_CountryCode, 
           SQLDATE, MonthYear, Year, 
           Actor1Type1Code, 
           NumArticles, 
           GoldsteinScale, 
           AvgTone 
          FROM temp 
         WHERE ActionGeo_CountryCode <> '' 
          AND Actor1Type1Code <> '' 
          AND NumArticles <> '' 
          AND GoldsteinScale <> '' 
          AND AvgTone <> ''""") 

sqlContext.registerDataFrameAsTable(df2, 'temp2') 
df3 = sqlContext.sql("""SELECT ActionGeo_CountryCode, 
           CAST(SQLDATE AS INTEGER), CAST(MonthYear AS INTEGER), CAST(Year AS INTEGER), 
           Actor1Type1Code, 
           CAST(NumArticles AS INTEGER), 
           CAST(GoldsteinScale AS INTEGER), 
           CAST(AvgTone AS INTEGER) 
          FROM temp2""") 

sqlContext.registerDataFrameAsTable(df3, 'temp3') 
sqlContext.cacheTable('temp3') 

dfdaily = sqlContext.sql("""SELECT ActionGeo_CountryCode, 
            SQLDATE, 
            Actor1Type1Code, 
            SUM(NumArticles) AS NumArticles, 
            ROUND(AVG(GoldsteinScale),0) AS GoldsteinScale, 
            ROUND(AVG(AvgTone),0) AS AvgTone 
           FROM temp3 
           GROUP BY ActionGeo_CountryCode, 
             SQLDATE, 
             Actor1Type1Code""") 

dfmonthly = sqlContext.sql("""SELECT ActionGeo_CountryCode, 
            MonthYear, 
            Actor1Type1Code, 
            SUM(NumArticles) AS NumArticles, 
            ROUND(AVG(GoldsteinScale),0) AS GoldsteinScale, 
            ROUND(AVG(AvgTone),0) as AvgTone 
           FROM temp3 
            GROUP BY ActionGeo_CountryCode, 
            MonthYear, 
            Actor1Type1Code""") 

dfyearly = sqlContext.sql("""SELECT ActionGeo_CountryCode, 
            Year, 
            Actor1Type1Code, 
            SUM(NumArticles) AS NumArticles, 
            ROUND(AVG(GoldsteinScale),0) AS GoldsteinScale, 
            ROUND(AVG(AvgTone),0) as AvgTone 
           FROM temp3 
           GROUP BY ActionGeo_CountryCode, 
             Year, 
             Actor1Type1Code""") 

def rddCleaning(rd,timeframe): 

    def popular_avg(curr_tup): 
     lst = curr_tup[1] 
     dictionary = curr_tup[2] 
     dict_matches = {} 
     for tup in lst: 
     event_type = tup[0] 
      dict_matches[event_type] = dictionary[event_type] 
     return ((curr_tup[0],lst,dict_matches,curr_tup[3])) 

    def merge_info(tup): 
     main_dict = tup[1] 
     info_dict = tup[2] 
     for key in info_dict: 
      main_dict[key].update(info_dict[key]) 
    main_dict["TotalArticles"] = {"total":tup[3]} 
     return ((tup[0],main_dict)) 

    def event_todict(tup): 
     lst = tup[1] 
     dict_matches = {} 
     for event_tup in lst: 
      dict_matches[event_tup[0]] = {"ArticleMentions":event_tup[1]} 
     return ((tup[0],dict_matches,tup[2],tup[3])) 

    def sum_allevents(tup): 
     type_lst = tup[1] 
     total_mentions = 0 
     for event in type_lst: 
       total_mentions += event[1] 
     return ((tup[0],type_lst,tup[2],total_mentions)) 

    rdd_format = rd.map(lambda y: ((y["ActionGeo_CountryCode"],y[timeframe]), 
            ([(y["Actor1Type1Code"],y["NumArticles"])], 
        [(y["Actor1Type1Code"],{"Goldstein":y["GoldsteinScale"],"ToneAvg":y["AvgTone"]})] 
        ))) \ 
      .reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1])) \ 
       .map(lambda v: (v[0], 
            sorted(v[1][0],key=itemgetter(1),reverse=True), 
            dict(v[1][1]))) \ 
      .map(sum_allevents) \ 
        .map(popular_avg) \ 
      .map(event_todict) \ 
        .map(merge_info) \ 
       .map(lambda d: ((d[0][0],d[0][1],d[1]))) 

    return rdd_format 

print("THIS IS THE FIRST ONE ######################################################") 
daily_rdd = rddCleaning(dfdaily.rdd,"SQLDATE") 
print(daily_rdd.take(5)) 
print("THIS IS THE SECOND ONE ######################################################") 
monthly_rdd = rddCleaning(dfmonthly.rdd,"MonthYear") 
print(monthly_rdd.take(5)) 
print("THIS IS THE THIRD ONE ######################################################") 
yearly_rdd = rddCleaning(dfyearly.rdd,"Year") 
print(yearly_rdd.take(5)) 
+0

對於初學者,你會放棄所有的udfs。這些都不是必要的。 – zero323

+0

謝謝你的迴應!對此,我真的非常感激。 我用ufds,因爲我不確定如何在SQL中使用python字典。在兩個udfs中,我將基於存儲在另一個python文件中的字典重新命名行。例如,如果一列中的單元格具有「美國」,那麼我在udf中將其重命名爲「美國」。我可以在SQL中有一個字典變量嗎? – CatherineAlv

+0

您可以使用'join'與'broadcast'或[literal map](http://stackoverflow.com/a/32788650/1560062)。 – zero323

回答

2

我能想到的最直接的就是用dataframes代替RDD。 由於python和JVM之間的轉換,基本上python中的RDD比scala慢得多。數據框也享有許多優化。

這裏很難遵循所有代碼來嘗試建議轉換,但是,作爲一個基礎,您可以使用spark.read.csv從csv直接讀取數據幀(並設置模式以便很多驗證會自動發生),許多現有的函數應該使寫起來更容易。

+0

如果我將csv作爲數據框讀取,那麼我不能再執行.map和.reduce函數了嗎?我需要.map和.reduce對我的數據進行多次轉換。我使用的是主人和三個奴隸,我想利用我的所有資源。如果我使用RDD,那麼它將任務並行化到其他節點。 – CatherineAlv

+0

@CatherineAlv我們假設你在RDD和DataFrame之間來回切換,如果你有一些細節 – eliasah

+0

快速瀏覽你的代碼似乎表明你在.map和.reduce中做的每件事都可以用數據框完成(通過使用數據框函數)。 Spark數據框(它是pyspark.sql中spark sql包的一部分)的分佈與RDD(以及更多優化)相似。如果缺少某些東西,通常可以使用UDF解決它(儘管python UDF比scala慢,並且比pyspark.sql.functions中的可用函數慢)。嘗試查看Spark SQL的編程指南以獲取更多信息 –