2016-06-12 97 views
1

這是我的python-spark代碼的一部分,它的部分運行速度太慢,無法滿足我的需求。 特別是這部分代碼,我真的很想提高它的速度,但不知道如何去做。目前需要大約1分鐘的時間處理6000萬個數據行,我想將其提高到10秒以內。提高火花應用的速度

sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load() 

我的火花應用程序的更多背景:

article_ids = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="article_by_created_at", keyspace=source).load().where(range_expr).select('article','created_at').repartition(64*2) 

axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load() 
speed_df = article_ids.join(axes,article_ids.article==axes.article).select(axes.article,axes.at,axes.comments,axes.likes,axes.reads,axes.shares) \ 
    .map(lambda x:(x.article,[x])).reduceByKey(lambda x,y:x+y) \ 
    .map(lambda x:(x[0],sorted(x[1],key=lambda y:y.at,reverse = False))) \ 
    .filter(lambda x:len(x[1])>=2) \ 
    .map(lambda x:x[1][-1]) \ 
    .map(lambda x:(x.article,(x,(x.comments if x.comments else 0)+(x.likes if x.likes else 0)+(x.reads if x.reads else 0)+(x.shares if x.shares else 0))))  

非常感謝您的建議。

編輯:

計數佔據了大部分的時間(50歲)不參加

我也試圖與提高並行,但它並沒有任何明顯的效果:

sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().repartition(number) 

sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source,numPartitions=number).load() 

This is the picture from spark showing how long each operation takes

+1

您確定它是負載,還是加入?加盟價格昂貴... –

+0

計數佔用大部分時間不加入,請參閱上面的我的更新。謝謝 – peter

+1

這個問題與[this](http://stackoverflow.com/a/37507116/1560062)有何不同? – eliasah

回答

4

首先,你應該弄清楚實際花費最多的時間。

例如確定多久,光看數據需要

axes = sqlContext 
    .read 
    .format("org.apache.spark.sql.cassandra") 
    .options(table="axes", keyspace=source) 
    .load() 
    .count() 

提高並行或並行讀者的數量可能會幫助這一點,但只有當你不杏你卡桑德拉集羣的IO。

其次,看看你是否可以用Dataframes API做所有事情。每次你使用python lambda時,你都會在python和scala類型之間產生序列化成本。

編輯:

sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().repartition(number) 

將只需要裝載完成所以這不會幫助你後生效

sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source,numPartitions=number).load() 

對於Spark Cassandra連接器來說這不是一個有效的參數,所以這將不會執行任何操作。

請參閱 https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#read-tuning-parameters 輸入分割大小決定放入Spark分區的C *分區數量。

+0

我在上面添加了一些細節我試圖在上面的編輯中使用這兩種方法增加parralellsim,但它沒有任何效果。你能否說明你在dateframes API中的意思?謝謝 – peter

+2

@Peter我爲您提供了一個只使用DataFrames的鏈接[上一次](http://stackoverflow.com/a/37507116/1560062)。 – zero323

+0

@ zero323我試着只使用dateframes,但dateframe似乎沒有keyby和reducebykey方法,因此看起來我需要回到使用RDD。當我嘗試時出現此錯誤消息:AttributeError:'DataFrame'對象沒有屬性'keyBy'。任何想法該怎麼辦?謝謝 – peter