0

我有一個包含4個節點的集羣:3個Spark節點和1個Solr節點。我的CPU是8核心,我的內存是32 GB,磁盤空間是SSD。我使用cassandra作爲我的數據庫。我的數據量在6小時後爲22GB,現在我有大約3,4百萬行,應該在5分鐘之內閱讀。爲什麼我的Spark流應用程序如此緩慢?

但是已經無法在這段時間內完成任務了。我未來的計劃是在5分鐘內讀取100萬行。我不確定我現在可以增加或做更好的事情來實現這一結果,並實現我未來的目標。這是甚至可能或者是否會更好地使用火花進行實時分析,並使用例如更長的尾部數據(大於1天或幾個小時)的hadoop? 非常感謝! 這裏是我的星火應用程序代碼:

import sys 
import json 
from pyspark import SparkContext, SparkConf 
from pyspark.streaming import StreamingContext 
from pyspark.sql import SQLContext, Row 
from pyspark.streaming.kafka import KafkaUtils 
from datetime import datetime, timedelta 
from dateutil.parser import parse 
from cassandra.cluster import Cluster 
import pytz 
from dateutil.tz import tzutc 
tz = pytz.timezone('') 
appname = str(sys.argv[1]) 
source = str(sys.argv[2]) 
cluster = Cluster(['localhost']); 
session_statis = cluster.connect('keyspace') 
def read_json(x): 
    try: 
     y = json.loads(x) 
    except: 
     y = 0 
    return y 
def TransformInData(x): 
    try: 
     body = json.loads(x['body']) 
     return (body['articles']) 
    except: 
     return 0 
def axesTransformData(x): 
    try: 
     body = json.loads(x['body']) 
     return (body) 
    except: 
     return 0 
def storeDataToCassandra(rdd): 
    rdd_cassandra =rdd.map(lambda x:(x[0],(x[0],x[1]['thumbnail'], x[1]['title'], x[1]['url'], datetime.strptime(parse(x[1]['created_at']).strftime('%Y-%m-%d %H:%M:%S'), "%Y-%m-%d %H:%M:%S"),source, x[1]['category'] if x[1]['category'] else '', x[1]['channel'],x[1]['genre']))) \ 
          .subtract(articles) 
    rdd_article = rdd_cassandra.map(lambda x:Row(id=x[1][0],source=x[1][5],thumbnail=x[1][1],title=x[1][2],url=x[1][3],created_at=x[1][4],category=x[1][6],channel=x[1][7],genre=x[1][8])) 
    rdd_schedule = rdd_cassandra.map(lambda x:Row(source=x[1][5],type='article',scheduled_for=x[1][4]+timedelta(minutes=5),id=x[1][0])) 
    rdd_article_by_created_at = rdd_cassandra.map(lambda x:Row(source=x[1][5],created_at=x[1][4],article=x[1][0])) 
    rdd_article_by_url = rdd_cassandra.map(lambda x:Row(url=x[1][3],article=x[1][0])) 
    if rdd_article.count()>0: 
     result_rdd_article = sqlContext.createDataFrame(rdd_article) 
     result_rdd_article.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append") 
    if rdd_schedule.count()>0: 
     result_rdd_schedule = sqlContext.createDataFrame(rdd_schedule) 
     result_rdd_schedule.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append") 
    if rdd_article_by_created_at.count()>0: 
     result_rdd_article_by_created_at = sqlContext.createDataFrame(rdd_article_by_created_at) 
     result_rdd_article_by_created_at.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append") 
    if rdd_article_by_url.count()>0: 
     result_rdd_article_by_url = sqlContext.createDataFrame(rdd_article_by_url) 
     result_rdd_article_by_url.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append") 
#  
def axesStoreToCassandra(rdd): 
    axes_rdd = rdd.map(lambda x:Row(article=x[1]['id'],at=datetime.now(),comments=x[1]['comments'],likes=x[1]['attitudes'],reads=0,shares=x[1]['reposts'])) 
    if axes_rdd.count()>0: 
     result_axes_rdd = sqlContext.createDataFrame(axes_rdd) 
     result_axes_rdd.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append") 



def joinstream(rdd): 
    article_channels = articlestat.join(channels).map(lambda x:(x[1][0]['id'],{'id':x[1][0]['id'],'thumbnail':x[1][0]['thumbnail'],'title':x[1][0]['title'],'url':x[1][0]['url'],'created_at':x[1][0]['created_at'],'source':x[1][0]['source'],'genre':x[1][0]['genre'],'category':x[1][1]['category'],'author':x[1][1]['author']})) 
    speed_rdd = axes.map(lambda x:(x.article,[[x.at,x.comments,x.likes,x.reads,x.shares]])) \ 
       .reduceByKey(lambda x,y:x+y) \ 
       .map(lambda x:(x[0],sorted(x[1],key=lambda y:y[0],reverse = True)[0],sorted(x[1],key=lambda y:y[0],reverse = True)[1]) if len(x[1])>=2 else (x[0],sorted(x[1],key=lambda y:y[0],reverse = True)[0],[sorted(x[1],key=lambda y:y[0],reverse = True)[0][0]-timedelta(seconds=300),0,0,0,0])) \ 
       .filter(lambda x:(x[1][0]-x[2][0]).seconds>0) \ 
       .map(lambda x:(x[0],{'id':x[0],'comments':x[1][1],'likes':x[1][2],'reads':x[1][3],'shares':x[1][4],'speed':int(5*288*((x[1][4]-x[2][4])/((x[1][0]-x[2][0]).seconds/60.0)))})) \ 
       .filter(lambda x:x[1]['speed']>=0) \ 
       .filter(lambda x:x[1]['shares']>0) 
    statistics = article_channels.join(speed_rdd) \ 
       .map(lambda x:{'id':x[1][0]['id'],'thumbnail':x[1][0]['thumbnail'],'title':x[1][0]['title'],'url':x[1][0]['url'],'created_at':x[1][0]['created_at'],'source':x[1][0]['source'],'category':x[1][0]['category'],'author':x[1][0]['author'],'genre':x[1][0]['genre'],'comments':x[1][1]['comments'],'likes':x[1][1]['likes'],'reads':x[1][1]['reads'],'shares':x[1][1]['shares'],'speed':x[1][1]['speed']}) 
    timeone=datetime.now()-timedelta(hours=1) 
    timethree = datetime.now()-timedelta(hours=3) 
    timesix = datetime.now()-timedelta(hours=6) 
    timetwelve = datetime.now()-timedelta(hours=12) 
    timetwentyfour = datetime.now()-timedelta(hours=24) 
    result1 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timeone).map(lambda x:Row(timespan='1',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at']+timedelta(hours=8),genre=x['genre'],reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author'])) 
    result3 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timethree and x['created_at']+timedelta(hours=8)<=timeone).map(lambda x:Row(timespan='3',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at']+timedelta(hours=8),genre=x['genre'],reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author'])) 
    result6 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timesix and x['created_at']+timedelta(hours=8)<=timethree).map(lambda x:Row(timespan='6',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at']+timedelta(hours=8),genre=x['genre'],reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author'])) 
    result12 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timetwelve and x['created_at']+timedelta(hours=8)<=timesix).map(lambda x:Row(timespan='12',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at']+timedelta(hours=8),genre=x['genre'],reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author'])) 
    result24 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timetwentyfour and x['created_at']+timedelta(hours=8)<=timetwelve).map(lambda x:Row(timespan='24',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at']+timedelta(hours=8),genre=x['genre'],reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author'])) 
    if result1.count()>0: 
     session_statis.execute('DELETE FROM tablename WHERE source = %s and timespan= %s', (source,'1')) 
     resultschema1 = sqlContext.createDataFrame(result1) 
     resultschema1.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append") 
    if result3.count()>0: 
     session_statis.execute('DELETE FROM tablename WHERE source = %s and timespan= %s', (source,'3')) 
     resultschema3 = sqlContext.createDataFrame(result3) 
     resultschema3.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append") 

    if result6.count()>0: 
     session_statis.execute('DELETE FROM tablename WHERE source = %s and timespan= %s', (source,'6')) 
     resultschema6 = sqlContext.createDataFrame(result6) 
     resultschema6.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append") 

    if result12.count()>0: 
     session_statis.execute('DELETE FROM tablename WHERE source = %s and timespan= %s', (source,'12')) 
     resultschema12 = sqlContext.createDataFrame(result12) 
     resultschema12.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append") 

    if result24.count()>0: 
     session_statis.execute('DELETE FROM tablename WHERE source = %s and timespan= %s', (source,'24')) 
     resultschema24 = sqlContext.createDataFrame(result24) 
     resultschema24.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append") 
conf = SparkConf().setAppName(appname) 
sc = SparkContext(conf=conf) 
ssc = StreamingContext(sc,30) 
sqlContext = SQLContext(sc) 
channels = sc.cassandraTable("keyspace","tablename").map(lambda x:(x.id,{'author':x.name,'category':x.category})) 
articles = sc.cassandraTable('keyspace','tablename').map(lambda x:(x.id,(x.id,x.thumbnail,x.title,x.url,x.created_at+timedelta(hours=8),source,x.category,x.channel,x.genre))) 
articlestat = sc.cassandraTable('keyspace','tablename').map(lambda x:(x.channel,{'id':x.id,'thumbnail':x.thumbnail,'title':x.title,'url':x.url,'created_at':x.created_at,'source':x.source,'category':x.category,'channel':x.channel,'genre':x.genre})) 
axes = sc.cassandraTable('keyspace','tablename') 
topic = 'topic1' 
kafkaParams = {"metadata.broker.list": "localhost:9092"} 
article_stream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams) 
article_join_stream=article_stream.map(lambda x:read_json(x[1])).filter(lambda x: x!=0).map(lambda x:TransformInData(x)).filter(lambda x: x!=0).flatMap(lambda x:(a for a in x)).map(lambda x:(x['id'].encode("utf-8") ,x)) 
article_join_stream.transform(storeDataToCassandra).pprint() 
axes_topic = 'topic2' 
axes_stream = KafkaUtils.createDirectStream(ssc, [axes_topic], kafkaParams) 
axes_join_stream = axes_stream.map(lambda x:read_json(x[1])).filter(lambda x: x!=0).map(lambda x:axesTransformData(x)).filter(lambda x: x!=0).flatMap(lambda x:(a for a in x)).map(lambda x:(str(x['id']),x)) 
axes_join_stream.transform(axesStoreToCassandra).pprint() 
statistics = article_join_stream.map(lambda x:(x[0])).window(15*60,15*60) 
statistics.transform(joinstream).pprint() 
ssc.start()  

編輯: 這是一個似乎消耗大部分時間階段。對此有何想法?

enter image description here

+2

你看了一下http:// :4040/stages /看哪個操作最耗時? – Marco

+0

我添加了最長但不確定代碼中含義的階段。 – peter

+1

使用Python lambda總是非常昂貴,因爲一切都是序列化進出python。 – RussS

回答

1

乍看之下,似乎你只是"spark-submit <your application>"

這意味着你正在使用的內存和CPU的默認分配給應用程序(這大約是RAM的1CPU和512MB啓動應用程序在大多數默認情況下)

這是假設您使用YARN,因爲您沒有提供此信息。

使用適當的資源啓動您的應用程序,您會看到改進。

編輯:

我看你使用了大量的lambda表達式,那些需要被序列化。 要知道,每次使用的對象都是圍繞整個對象傳遞的。

I.E.您正在使用完整對象this.value而不僅僅是value。 要解決這個問題,你可以使用一個局部變量_value = this.value並使用它來繼續。

這可能會爲您提供加速。

+0

這是我提交工作的命令。自從我使用datastax以來,我沒有紗線。 screen -dmS spark dse spark-submit --master spark:// localhost:7077 --executor-memory 2G --driver-memory 1G --total-executor-cores 6 --packages org.apache.spark:spark-streaming -kafka_2.10:1.4.1 --jars /root/spark-streaming-kafka_2.10-1.4.1.jar /root/pythonspark/com/spark/application.py appcationname source – peter

相關問題