在下面的代碼中,我創建了一個rdd,對其進行了緩存並創建了其中的子rdd。 vertexRDD - > newVert1 - > newVert2 - > newVert3 - > newVert4 - > newVert5。我在每個RDD中做一個簡單的增量操作。
未啓用高速緩存時,對於newVert1,newVert2數組值,newVert3 RDD是rdd.cache如何工作?
0:ArrayBuffer(1, 0, 0, 0) 0:ArrayBuffer(2, 0, 0, 0) 0:ArrayBuffer(3, 0, 0, 0)
1:ArrayBuffer(0, 1, 0, 0) 1:ArrayBuffer(0, 2, 0, 0) 1:ArrayBuffer(0, 3, 0, 0)
2:ArrayBuffer(0, 0, 1, 0) 2:ArrayBuffer(0, 0, 2, 0) 2:ArrayBuffer(0, 0, 3, 0)
3:ArrayBuffer(0, 0, 0, 1) 3:ArrayBuffer(0, 0, 0, 2) 3:ArrayBuffer(0, 0, 0, 3)
但是,當高速緩存方法在parentRDD調用,用於newVert1,newVert2數組值,newVert3 RDD是
0:ArrayBuffer(1, 0, 0, 0) 0:ArrayBuffer(3, 0, 0, 0) 0:ArrayBuffer(6, 0, 0, 0)
1:ArrayBuffer(0, 1, 0, 0) 1:ArrayBuffer(0, 3, 0, 0) 1:ArrayBuffer(0, 6, 0, 0)
2:ArrayBuffer(0, 0, 1, 0) 2:ArrayBuffer(0, 0, 3, 0) 2:ArrayBuffer(0, 0, 6, 0)
3:ArrayBuffer(0, 0, 0, 1) 3:ArrayBuffer(0, 0, 0, 3) 3:ArrayBuffer(0, 0, 0, 6)
這與RDD的foreach方法有關。此方法正在增加數組元素的值。
我已將結果作爲評論發佈在代碼中。我已將結果發佈在代碼中,作爲每個打印語句下面的註釋。請幫助我理解緩存操作。謝謝
package com.examples
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object ReCalculationModified {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[1]").setAppName("RecalculationModified")
val sc = new SparkContext(conf)
val vertex = Array(
(0, Array(0, 0, 0, 0)),
(1, Array(0, 0, 0, 0)),
(2, Array(0, 0, 0, 0)),
(3, Array(0, 0, 0, 0)))
val vertexRDD = sc.makeRDD(vertex).map(x => x).cache()
val newVert1 = vertexRDD.map {
case (vid, array) =>
array(vid) += 1
(vid, array)
}
println("--------------newVertex1-------------")
newVert1.foreach(x => println(x._1 + ":" + x._2.toBuffer))
println("--------------VertexRDD-------------")
vertexRDD.foreach(x => println(x._1 + ":" + x._2.toBuffer))
// Output of both vertexRDD and newVert1 is
// 0:ArrayBuffer(1, 0, 0, 0)
// 1:ArrayBuffer(0, 1, 0, 0)
// 2:ArrayBuffer(0, 0, 1, 0)
// 3:ArrayBuffer(0, 0, 0, 1)
val newVert2 = newVert1.map {
case (vid, array) =>
array(vid) += 1
(vid, array)
}
println("--------------newVertex2-------------")
newVert2.foreach(x => println(x._1 + ":" + x._2.toBuffer))
println("--------------VertexRDD-------------")
vertexRDD.foreach(x => println(x._1 + ":" + x._2.toBuffer))
// Output of both vertexRDD and newVert2 is
// 0:ArrayBuffer(3, 0, 0, 0)
// 1:ArrayBuffer(0, 3, 0, 0)
// 2:ArrayBuffer(0, 0, 3, 0)
// 3:ArrayBuffer(0, 0, 0, 3)
val newVert3 = newVert2.map {
case (vid, array) =>
array(vid) += 1
(vid, array)
}
println("--------------newVertex3-------------")
newVert3.foreach(x => println(x._1 + ":" + x._2.toBuffer))
println("--------------VertexRDD-------------")
vertexRDD.foreach(x => println(x._1 + ":" + x._2.toBuffer))
// Output of both vertexRDD and newVert3 is
// 0:ArrayBuffer(6, 0, 0, 0)
// 1:ArrayBuffer(0, 6, 0, 0)
// 2:ArrayBuffer(0, 0, 6, 0)
// 3:ArrayBuffer(0, 0, 0, 6)
val newVert4 = newVert3.map {
case (vid, array) =>
array(vid) += 1
(vid, array)
}
println("--------------newVertex4-------------")
newVert4.foreach(x => println(x._1 + ":" + x._2.toBuffer))
println("--------------VertexRDD-------------")
vertexRDD.foreach(x => println(x._1 + ":" + x._2.toBuffer))
// Output of both vertexRDD and newVert4 is
// 0:ArrayBuffer(10, 0, 0, 0)
// 1:ArrayBuffer(0, 10, 0, 0)
// 2:ArrayBuffer(0, 0, 10, 0)
// 3:ArrayBuffer(0, 0, 0, 10)
val newVert5 = newVert4.map {
case (vid, array) =>
array(vid) += 1
(vid, array)
}
println("--------------newVertex5-------------")
newVert5.foreach(x => println(x._1 + ":" + x._2.toBuffer))
println("--------------VertexRDD-------------")
vertexRDD.foreach(x => println(x._1 + ":" + x._2.toBuffer))
// Output of both vertexRDD and newVert5 is
// 0:ArrayBuffer(15, 0, 0, 0)
// 1:ArrayBuffer(0, 15, 0, 0)
// 2:ArrayBuffer(0, 0, 15, 0)
// 3:ArrayBuffer(0, 0, 0, 15)
}
}
_「在不調用緩存方法,結果如預期的和可以理解,但是當父RDD緩存,我不能瞭解結果。「_你能解釋一下你的意思嗎?謝謝! –
你可以看看這些有類似問題的討論鏈接:https://stackoverflow.com/questions/34513894/apache-sparks-rddvector-immutability-issue,https://community.hortonworks.com/questions /18708/are-spark-rdd-really-mutable.html。 – Shaido
@Jacek - 我的意思是數組中的值爲每個子RDD增加1。下面是來自newVert1 RDD 0:ArrayBuffer(1,0,0,0) 1:ArrayBuffer(0,1,0,0) 2:ArrayBuffer(0,0,0,0) 3:ArrayBuffer(0 ,0,0,1) – Kannan