如果RDD對象具有非空.dependencies
,這是否意味着它有血統?我怎麼能刪除它?使用Apache-Spark進行迭代計算時出現StackOverflowError
我正在做迭代計算,每次迭代都取決於在前一次迭代中計算的結果。經過多次迭代後,它會拋出StackOverflowError
。
起初我試圖使用cache
,我讀了pregel.scala
的代碼,這是GraphX
一部分,他們用一個count
方法cache
後兌現的對象,但我附加調試器,似乎這種方法不空.dependencies
,這也不能在我的代碼中工作。
另一種替代方法是使用checkpoint
,我嘗試checkpoint
頂點和邊緣爲我的Graph
對象,然後通過count
物化它的頂點和邊緣。然後我使用.isCheckpointed
來檢查它是否正確檢查點,但它總是返回false。
更新 我寫了一個可以重現該問題的簡化版代碼。
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("HDTM")
.setMaster("local[4]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "edu.nd.dsg.hdtm.util.HDTMKryoRegistrator")
val sc = new SparkContext(conf)
val v = sc.parallelize(Seq[(VertexId, Long)]((0L, 0L), (1L, 1L), (2L, 2L)))
val e = sc.parallelize(Seq[Edge[Long]](Edge(0L, 1L, 0L), Edge(1L, 2L, 1L), Edge(2L, 0L, 2L)))
val newGraph = Graph(v, e)
var currentGraph = newGraph
val vertexIds = currentGraph.vertices.map(_._1).collect()
for (i <- 1 to 1000) {
var g = currentGraph
vertexIds.toStream.foreach(id => {
g = Graph(currentGraph.vertices, currentGraph.edges)
g.cache()
g.edges.cache()
g.vertices.cache()
g.vertices.count()
g.edges.count()
})
currentGraph.unpersistVertices(blocking = false)
currentGraph.edges.unpersist(blocking = false)
currentGraph = g
println(" iter "+i+" finished")
}
}
更新
下面是代碼,我刪除大部分不必要的方法,因此代碼行是最小化的,但如果你考慮關於它的功能可能沒有意義。
object StackOverFlow {
final val PATH = "./"
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("HDTM")
.setMaster("local[4]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "edu.nd.dsg.hdtm.util.HDTMKryoRegistrator")
val sc = new SparkContext(conf)
val filePath = PATH + "src/test/resources/binary.txt"
val wikiGraph: Graph[WikiDataVertex, Double] = WikiGraphLoader.loadGraphFromTestHDTMFile(sc, filePath)
wikiGraph.cache()
val root = 0L
val bfsGraph = GraphAlgorithm.initializeGraph(wikiGraph, root, sc)
bfsGraph.cache()
val vertexIds = bfsGraph.vertices.map(_._1).collect()
var currentGraph = bfsGraph
for (i <- 1 to 1000) {
var g = currentGraph
vertexIds.toStream.foreach(id => {
g = samplePath(g, id, root)
})
currentGraph.unpersistVertices(blocking = false)
currentGraph.edges.unpersist(blocking = false)
currentGraph = g
println(" iter "+i+" finished")
}
}
def samplePath[ED: ClassTag](graph: Graph[WikiDataVertex, ED],
instance: VertexId, root: VertexId): Graph[WikiDataVertex, ED] = {
if(instance == 0L) return graph
val (removedGraph, remainedGraph) = splitGraph(graph, instance)
/**
* Here I omit some other code, which will change the attributes of removedGraph and remainedGraph
*/
val newVertices = graph.outerJoinVertices(removedGraph.vertices ++ remainedGraph.vertices)({
(vid, vd, opt) => {
opt.getOrElse(vd)
}
}).vertices
val newEdges = graph.edges.map(edge => {
if (edge.dstId == instance)
edge.copy(srcId = edge.srcId)
// In the real case edge.srcId will be replaced by an vertexId calculated by other functions
else
edge.copy()
})
val g = Graph(newVertices, newEdges)
g.vertices.cache()
g.edges.cache()
g.cache()
g.vertices.count()
g.edges.count()
remainedGraph.unpersistVertices(blocking = false)
remainedGraph.edges.unpersist(blocking = false)
removedGraph.unpersistVertices(blocking = false)
removedGraph.edges.unpersist(blocking = false)
g
}
/**
* Split a graph into two sub-graph by an vertex `instance`
* The edge that ends at `instance` will lose
* @param graph Graph that will be separated
* @param instance Vertex that we are using to separate the graph
* @tparam ED Edge type
* @return (sub-graph with `instance`, sub-graph without `instance`)
**/
def splitGraph[ED: ClassTag]
(graph: Graph[WikiDataVertex, ED], instance: VertexId): (Graph[WikiDataVertex, ED], Graph[WikiDataVertex,ED]) = {
val nGraph = GraphAlgorithm.graphWithOutDegree(graph)
// This will need twice, cache it to prevent re-computation
nGraph.cache()
val wGraph = nGraph.subgraph(epred = e => e.dstAttr._1.path.contains(instance) ||
e.srcAttr._1.path.contains(instance),
vpred = (id, vd) => vd._1.path.contains(instance))
val woGraph = nGraph.subgraph(epred = e => !e.dstAttr._1.path.contains(instance) &&
!e.srcAttr._1.path.contains(instance),
vpred = (id, vd) => !vd._1.path.contains(instance))
val removedGraph = Graph(wGraph.vertices.mapValues(_._1), wGraph.edges, null)
val remainedGraph = Graph(woGraph.vertices.mapValues(_._1), woGraph.edges, null)
removedGraph.vertices.count()
removedGraph.edges.count()
removedGraph.cache()
remainedGraph.vertices.count()
remainedGraph.edges.count()
remainedGraph.cache()
nGraph.unpersistVertices(blocking = false)
nGraph.edges.unpersist(blocking = false)
(removedGraph, remainedGraph)
}
}
在最初的10次迭代中,它運行得很快,之後每次迭代需要多一點時間。我檢查Spark WebUI,每個操作的實際執行時間幾乎相同,但隨着迭代次數的增加,Scheduler Delay
也會增加。經過20個小時的迭代,它會拋出StackOverflowError。
你可以把你的代碼?似乎'StackOverflowError'與RDD譜系無關。 – cloud
StackOverflow可能與某些非終止遞歸有關,而不是RDD譜系。是的,顯示一些代碼。 – maasg
@maasg我附上了代碼,謝謝你的幫助! – bxshi