2015-05-29 41 views
10

當我使用「++」來合併很多RDD時,我得到了錯誤堆棧溢出錯誤。Spark當聯合很多RDD引發堆棧溢出錯誤

Spark版本1.3.1 環境:yarn-client。 - 驅動器內存8G

RDD的數量超過4000個。每個RDD都從大小爲1 GB的文本文件中讀取。

正是在這種方式

val collection = (for (
    path <- files 
) yield sc.textFile(path)).reduce(_ union _) 

它正常工作時files具有體積小生成。 並且有錯誤

錯誤重複出現。我想這是一個被稱爲太多時間的遞歸函數?

Exception at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) 
    at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
    at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) 
    at scala.Option.getOrElse(Option.scala:120) 
    ..... 
+0

輸入數據的大小是多少? – eliasah

+0

默認情況下,Spark僅使用內存RDD序列化。如果不適合,您可能需要嘗試使用磁盤上的永久磁盤選項。 – C4stor

+0

@eliasah問題已更新。但是投入的大小如何? – worldterminator

回答

14

改爲使用SparkContext.union(...)代替一次結合多個RDD。

因爲RDD.union()爲每個RDD在譜系(任何計算中額外的一組堆棧框)創建了一個新步驟,所以您不希望每次都這樣做,而SparkContext.union ()一次完成。這將確保不會發生堆棧溢出錯誤。

+0

我完全同意,但我只是想知道如果它確保沒有得到堆棧溢出錯誤? – eliasah

+1

是的,因爲'RDD.union()'在每個RDD的lineage(一組額外的堆棧框架)中創建了一個新的步驟,而'SparkContext.union()'一次完成。 –

+0

謝謝!我將編輯您的答案,並在您的評論中添加您提到的信息。我相信它完成了答案。 – eliasah

0

看來,當聯合RDD一個接一個可以進入一系列非常長的遞歸函數調用。 在這種情況下,我們需要增加JVM堆棧內存。 在選項--driver-java-options "-Xss 100M"的火花中,驅動程序jvm堆棧內存配置爲100M。

肖恩歐文的解決方案也以更優雅的方式解決了問題。