2014-06-17 46 views
1

如果RDD對象具有非空.dependencies,這是否意味着它有血統?我怎麼能刪除它?使用Apache-Spark進行迭代計算時出現StackOverflowError

我正在做迭代計算,每次迭代都取決於在前一次迭代中計算的結果。經過多次迭代後,它會拋出StackOverflowError

起初我試圖使用cache,我讀了pregel.scala的代碼,這是GraphX一部分,他們用一個count方法cache後兌現的對象,但我附加調試器,似乎這種方法不空.dependencies,這也不能在我的代碼中工作。

另一種替代方法是使用checkpoint,我嘗試checkpoint頂點和邊緣爲我的Graph對象,然後通過count物化它的頂點和邊緣。然後我使用.isCheckpointed來檢查它是否正確檢查點,但它總是返回false。

更新 我寫了一個可以重現該問題的簡化版代碼。

def main(args: Array[String]) { 
    val conf = new SparkConf().setAppName("HDTM") 
     .setMaster("local[4]") 
     .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
     .set("spark.kryo.registrator", "edu.nd.dsg.hdtm.util.HDTMKryoRegistrator") 
    val sc = new SparkContext(conf) 

    val v = sc.parallelize(Seq[(VertexId, Long)]((0L, 0L), (1L, 1L), (2L, 2L))) 
    val e = sc.parallelize(Seq[Edge[Long]](Edge(0L, 1L, 0L), Edge(1L, 2L, 1L), Edge(2L, 0L, 2L))) 
    val newGraph = Graph(v, e) 
    var currentGraph = newGraph 
    val vertexIds = currentGraph.vertices.map(_._1).collect() 

    for (i <- 1 to 1000) { 
     var g = currentGraph 
     vertexIds.toStream.foreach(id => { 
     g = Graph(currentGraph.vertices, currentGraph.edges) 
     g.cache() 
     g.edges.cache() 
     g.vertices.cache() 
     g.vertices.count() 
     g.edges.count() 
     }) 

     currentGraph.unpersistVertices(blocking = false) 
     currentGraph.edges.unpersist(blocking = false) 
     currentGraph = g 
     println(" iter "+i+" finished") 
    } 

    } 

更新

下面是代碼,我刪除大部分不必要的方法,因此代碼行是最小化的,但如果你考慮關於它的功能可能沒有意義。

object StackOverFlow { 
    final val PATH = "./" 

    def main(args: Array[String]) { 
    val conf = new SparkConf().setAppName("HDTM") 
     .setMaster("local[4]") 
     .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
     .set("spark.kryo.registrator", "edu.nd.dsg.hdtm.util.HDTMKryoRegistrator") 
    val sc = new SparkContext(conf) 
    val filePath = PATH + "src/test/resources/binary.txt" 
    val wikiGraph: Graph[WikiDataVertex, Double] = WikiGraphLoader.loadGraphFromTestHDTMFile(sc, filePath) 
    wikiGraph.cache() 
    val root = 0L 
    val bfsGraph = GraphAlgorithm.initializeGraph(wikiGraph, root, sc) 
    bfsGraph.cache() 
    val vertexIds = bfsGraph.vertices.map(_._1).collect() 
    var currentGraph = bfsGraph 

    for (i <- 1 to 1000) { 
     var g = currentGraph 
     vertexIds.toStream.foreach(id => { 
      g = samplePath(g, id, root) 
     }) 

     currentGraph.unpersistVertices(blocking = false) 
     currentGraph.edges.unpersist(blocking = false) 
     currentGraph = g 
     println(" iter "+i+" finished") 
    } 

    } 

    def samplePath[ED: ClassTag](graph: Graph[WikiDataVertex, ED], 
           instance: VertexId, root: VertexId): Graph[WikiDataVertex, ED] = { 

    if(instance == 0L) return graph 

    val (removedGraph, remainedGraph) = splitGraph(graph, instance) 

    /** 
    * Here I omit some other code, which will change the attributes of removedGraph and remainedGraph 
    */ 

    val newVertices = graph.outerJoinVertices(removedGraph.vertices ++ remainedGraph.vertices)({ 
     (vid, vd, opt) => { 
     opt.getOrElse(vd) 
     } 
    }).vertices 

    val newEdges = graph.edges.map(edge => { 
     if (edge.dstId == instance) 
     edge.copy(srcId = edge.srcId) 
     // In the real case edge.srcId will be replaced by an vertexId calculated by other functions 
     else 
     edge.copy() 
    }) 

    val g = Graph(newVertices, newEdges) 
    g.vertices.cache() 
    g.edges.cache() 
    g.cache() 
    g.vertices.count() 
    g.edges.count() 

    remainedGraph.unpersistVertices(blocking = false) 
    remainedGraph.edges.unpersist(blocking = false) 
    removedGraph.unpersistVertices(blocking = false) 
    removedGraph.edges.unpersist(blocking = false) 

    g 
    } 

    /** 
    * Split a graph into two sub-graph by an vertex `instance` 
    * The edge that ends at `instance` will lose 
    * @param graph Graph that will be separated 
    * @param instance Vertex that we are using to separate the graph 
    * @tparam ED Edge type 
    * @return (sub-graph with `instance`, sub-graph without `instance`) 
    **/ 
    def splitGraph[ED: ClassTag] 
    (graph: Graph[WikiDataVertex, ED], instance: VertexId): (Graph[WikiDataVertex, ED], Graph[WikiDataVertex,ED]) = { 
    val nGraph = GraphAlgorithm.graphWithOutDegree(graph) 
    // This will need twice, cache it to prevent re-computation 
    nGraph.cache() 

    val wGraph = nGraph.subgraph(epred = e => e.dstAttr._1.path.contains(instance) || 
     e.srcAttr._1.path.contains(instance), 
     vpred = (id, vd) => vd._1.path.contains(instance)) 

    val woGraph = nGraph.subgraph(epred = e => !e.dstAttr._1.path.contains(instance) && 
     !e.srcAttr._1.path.contains(instance), 
     vpred = (id, vd) => !vd._1.path.contains(instance)) 

    val removedGraph = Graph(wGraph.vertices.mapValues(_._1), wGraph.edges, null) 
    val remainedGraph = Graph(woGraph.vertices.mapValues(_._1), woGraph.edges, null) 

    removedGraph.vertices.count() 
    removedGraph.edges.count() 
    removedGraph.cache() 
    remainedGraph.vertices.count() 
    remainedGraph.edges.count() 
    remainedGraph.cache() 

    nGraph.unpersistVertices(blocking = false) 
    nGraph.edges.unpersist(blocking = false) 

    (removedGraph, remainedGraph) 
    } 

} 

在最初的10次迭代中,它運行得很快,之後每次迭代需要多一點時間。我檢查Spark WebUI,每個操作的實際執行時間幾乎相同,但隨着迭代次數的增加,Scheduler Delay也會增加。經過20個小時的迭代,它會拋出StackOverflowError。

+1

你可以把你的代碼?似乎'StackOverflowError'與RDD譜系無關。 – cloud

+1

StackOverflow可能與某些非終止遞歸有關,而不是RDD譜系。是的,顯示一些代碼。 – maasg

+0

@maasg我附上了代碼,謝謝你的幫助! – bxshi

回答

1
val g = loadEdgeFile(sc, edge_pt, n_partition) 

g.edges.foreachPartition(_ => Unit) 
g.vertices.foreachPartition(_ => Unit) 

g.checkpoint() 

g.edges.foreachPartition(_ => Unit) 
g.vertices.foreachPartition(_ => Unit) 
println(s"is cp: ${g.isCheckpointed}" 

爲了得到一個圖表檢查站,它應滿足三個條件:

  • 圖是之前沒有物化;
  • 然後你檢查點它;
  • 你應該物化頂點和邊緣。 然後你檢查圖的狀態,你會得到一個真正的答案。