我已經在python中編寫了一個可正常工作的Spark程序。檢查RDD中是否存在值
但是,它在內存消耗方面效率低下&我正試圖優化它。我在AWS EMR上運行它,EMR正在消耗太多內存的工作。
Lost executor 11 on ip-*****: Container killed by YARN for exceeding memory limits. 11.4 GB of 10.4 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
我相信這個內存問題是由於這樣的事實,我在幾種情況下收集我的RDDS((),即使用.collect),因爲在後期階段,我需要測試,如果在列表中存在一定的價值是否由這些RDD製成。
所以,目前我的代碼看起來像這樣:
myrdd = data.map(lambda word: (word,1)) \
.reduceByKey(lambda a,b: a+b) \
.filter(lambda (a, b): b >= 5) \
.map(lambda (a,b) : a) \
.collect()
,並在代碼
if word in myrdd:
mylist.append(word)
myrdd2 = data2.map(lambda word: (word,1)) \
.reduceByKey(lambda a,b: a+b) \
.filter(lambda (a, b): b >= 5) \
.map(lambda (a,b) : a) \
.collect()
if word in myrdd2:
mylist2.append(word)
,然後我重複這種模式多次晚些時候。
有沒有辦法做到操作
if word in myrdd:
do something
不先收集RDD?
是否有像rdd.contains()這樣的函數?
P.S:我沒有在內存中緩存任何東西。我的火花背景是這樣的:從紗線
jobName = "wordcount"
sc = SparkContext(appName = jobName)
......
......
sc.stop()
不使用.collect ()它會把所有的數據傳給驅動程序,如果你有更大的數據集,會產生一個問題。使用myrdd2.foreachRDD並檢查值是否存在 – Backtrack
word = sc.broadcast([w1,w2,w3]) valuepresent = myrdd.filter {lambda x:x in word}這樣做也是一種解決方法,我會認爲 – Backtrack