我有數千萬行的數據。是否有可能使用火花流分析在一週或一天內分析所有這些信息?根據數據量觸發流式傳輸的限制是什麼?我不確定什麼是上限,什麼時候我應該把它們放到我的數據庫中,因爲Stream可能無法處理它們了。我也有不同的時間窗口1,3,6小時等我用窗口操作來分離數據。同時根據數據量激發流媒體的限制是什麼?
conf = SparkConf().setAppName(appname)
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc,300)
sqlContext = SQLContext(sc)
channels = sc.cassandraTable("abc","channels")
topic = 'abc.crawled_articles'
kafkaParams = {"metadata.broker.list": "0.0.0.0:9092"}
category = 'abc.crawled_article'
category_stream = KafkaUtils.createDirectStream(ssc, [category], kafkaParams)
category_join_stream = category_stream.map(lambda x:read_json(x[1])).filter(lambda x:x!=0).map(lambda x:categoryTransform(x)).filter(lambda x:x!=0).map(lambda x:(x['id'],x))
article_stream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams)
article_join_stream=article_stream.map(lambda x:read_json(x[1])).filter(lambda x: x!=0).map(lambda x:TransformInData(x)).filter(lambda x: x!=0).flatMap(lambda x:(a for a in x)).map(lambda x:(x['id'].encode("utf-8") ,x))
#axes topic integration the article and the axes
axes_topic = 'abc.crawled_axes'
axes_stream = KafkaUtils.createDirectStream(ssc, [axes_topic], kafkaParams)
axes_join_stream = axes_stream.filter(lambda x:'delete' not in str(x)).map(lambda x:read_json(x[1])).filter(lambda x: x!=0).map(lambda x:axesTransformData(x)).filter(lambda x: x!=0).map(lambda x:(str(x['id']),x)).map(lambda x:(x[0],{'id':x[0], 'attitudes':x[1]['likes'],'reposts':0,'comments':x[1]['comments'],'speed':x[1]['comments']}))
#axes_join_stream.reduceByKeyAndWindow(lambda x, y: x + y, 30, 10).transform(axestrans).pprint()
#join
statistics = article_join_stream.window(1*60*60,5*60).cogroup(category_join_stream.window(1*60*60,60)).cogroup((axes_join_stream.window(24*60*60,5*60)))
statistics.transform(joinstream).pprint()
ssc.start() # Start the computation ssc.awaitTermination()
ssc.awaitTermination()
這裏有很多問題,如果你清楚地分開它們,它會幫助回答。此外,如果您將包含的代碼最小化到足以說明問題的最小樣本,這將會很有幫助 – etov