我們正試圖在EMR中的火花上運行ETL。 S3中大約有2000萬個事件作爲gzipped json行。他們共約30個文件。我正在使用pyspark。使用sqlcontext進行並行查詢的火花
這是代碼,
def value_to_list(columns):
def value_map(values):
data = []
for val in values:
d = val.asDict()
data.append([d[column] for column in columns])
return data
return value_map
def main():
sc = SparkContext()
sql_context = SQLContenxt(sc)
all_events = SQLContenxt(sc).read.json("s3n://...", schema=StructType(fields), timestampFormat="yyyy-MM-dd HH:mm:ss")
all_events.registerTempTable('allevents')
for event_type in event_types:
process_event(sc, event_type, "allevents")
def process_event(sparkcontext, etype, tablename):
query = "select app_id, source, type, {time_cols}, count(*) as total " \
"from {table} where type = '{event_type}' " \
"group by app_id, source, type, {time_cols}"
time_cols_spec = [('hour', 'day', 'month', 'year'),
('day', 'month', 'year'),
('month', 'year'),
('year')]
for time_cols in time_cols_spec:
final_query = query.format(time_cols=", ".join(time_cols),
table=tablename,
event_type=etype)
dataframe = sql_context.sql(final_query)
dataframe.rdd.groupBy(lambda r: r['app_id'])\
.mapValues(value_to_list(['source'] + time_cols))\
.saveAsTextFile("s3n://...")
所以,我們有大約30事件類型和每個事件我在小時,天,月,年4組合正在聚集。所以每個查詢4。我們總共有2000M的事件。
我正在與運行此上
- AWS EMR(5.0.3)
- 阿帕奇火花2.0.1
- 1主,2工人
- 各機器是
m3.2xlarge
- 總內存是90GB
問題是,最後保存我這需要很長的時間。我最後一次問,它花了14小時,2次組合和一個事件:(
我知道我不是平行的方式去,這個循環是連續的,而且有2個循環。但我希望rdd,groupBy
,mapValues
運行當我看到事件時間軸時,我看到它的saveAsTextFile
佔用了99%的時間,可能是因爲它的火花執行遲緩
我需要使這個過程平行和快速,我該怎麼做?
這個答案給了我很多提示。這對我幫助很大。因此,我將此標記爲「答案」。 –