0

我們正試圖在EMR中的火花上運行ETL。 S3中大約有2000萬個事件作爲gzipped json行。他們共約30個文件。我正在使用pyspark。使用sqlcontext進行並行查詢的火花

這是代碼,

def value_to_list(columns): 
    def value_map(values): 
     data = [] 
     for val in values: 
      d = val.asDict() 
      data.append([d[column] for column in columns]) 
     return data 

    return value_map 


def main(): 
    sc = SparkContext() 
    sql_context = SQLContenxt(sc) 

    all_events = SQLContenxt(sc).read.json("s3n://...", schema=StructType(fields), timestampFormat="yyyy-MM-dd HH:mm:ss") 
    all_events.registerTempTable('allevents') 

    for event_type in event_types: 
     process_event(sc, event_type, "allevents") 


def process_event(sparkcontext, etype, tablename): 
    query = "select app_id, source, type, {time_cols}, count(*) as total " \ 
      "from {table} where type = '{event_type}' " \ 
      "group by app_id, source, type, {time_cols}" 
    time_cols_spec = [('hour', 'day', 'month', 'year'), 
         ('day', 'month', 'year'), 
         ('month', 'year'), 
         ('year')] 

    for time_cols in time_cols_spec: 
     final_query = query.format(time_cols=", ".join(time_cols), 
            table=tablename, 
            event_type=etype) 
     dataframe = sql_context.sql(final_query) 

     dataframe.rdd.groupBy(lambda r: r['app_id'])\ 
      .mapValues(value_to_list(['source'] + time_cols))\ 
      .saveAsTextFile("s3n://...") 

所以,我們有大約30事件類型和每個事件我在小時,天,月,年4組合正在聚集。所以每個查詢4。我們總共有2000M的事件。

我正在與運行此上

  • AWS EMR(5.0.3)
  • 阿帕奇火花2.0.1
  • 1主,2工人
  • 各機器是m3.2xlarge
  • 總內存是90GB

問題是,最後保存我這需要很長的時間。我最後一次問,它花了14小時,2次組合和一個事件:(

我知道我不是平行的方式去,這個循環是連續的,而且有2個循環。但我希望rdd,groupBymapValues運行當我看到事件時間軸時,我看到它的saveAsTextFile佔用了99%的時間,可能是因爲它的火花執行遲緩

我需要使這個過程平行和快速,我該怎麼做?

回答

1

您可以應用4種主要優化:

  1. 您正在對未針對查詢進行優化的普通json文件執行聚合。將它們重寫爲實木複合地板,按事件類型重新分區並存儲在S3上 - 它們將佔用較少的空間,並且您的應用程序將獲得很好的速度提升。

  2. 增加並行性。沒有必要在這種強大的虛擬機上安裝一個驅動程序(master),而是生成一個較小的實例(例如m3.medium),併爲工作人員使用全部3個大實例。

  3. 用Dataframes替換RDD API調用:.rdd.groupBy().mapValues()可以替換爲.groupBy(dataframe.app_id).agg(collect_list()),然後進行一些映射。

  4. 您可以對(小時,日,月,年)數據集的原始數據執行查詢,然後使用此聚合來調用給定事件的所有剩餘查詢。

+0

這個答案給了我很多提示。這對我幫助很大。因此,我將此標記爲「答案」。 –