2017-09-19 64 views
0

在下面的代碼中,我創建了一個rdd,對其進行了緩存並創建了其中的子rdd。 vertexRDD - > newVert1 - > newVert2 - > newVert3 - > newVert4 - > newVert5。我在每個RDD中做一個簡單的增量操作。
未啓用高速緩存時,對於newVert1,newVert2數組值,newVert3 RDD是rdd.cache如何工作?

0:ArrayBuffer(1, 0, 0, 0) 0:ArrayBuffer(2, 0, 0, 0) 0:ArrayBuffer(3, 0, 0, 0) 
1:ArrayBuffer(0, 1, 0, 0) 1:ArrayBuffer(0, 2, 0, 0) 1:ArrayBuffer(0, 3, 0, 0) 
2:ArrayBuffer(0, 0, 1, 0) 2:ArrayBuffer(0, 0, 2, 0) 2:ArrayBuffer(0, 0, 3, 0) 
3:ArrayBuffer(0, 0, 0, 1) 3:ArrayBuffer(0, 0, 0, 2) 3:ArrayBuffer(0, 0, 0, 3) 

但是,當高速緩存方法在parentRDD調用,用於newVert1,newVert2數組值,newVert3 RDD是

0:ArrayBuffer(1, 0, 0, 0) 0:ArrayBuffer(3, 0, 0, 0) 0:ArrayBuffer(6, 0, 0, 0) 
1:ArrayBuffer(0, 1, 0, 0) 1:ArrayBuffer(0, 3, 0, 0) 1:ArrayBuffer(0, 6, 0, 0) 
2:ArrayBuffer(0, 0, 1, 0) 2:ArrayBuffer(0, 0, 3, 0) 2:ArrayBuffer(0, 0, 6, 0) 
3:ArrayBuffer(0, 0, 0, 1) 3:ArrayBuffer(0, 0, 0, 3) 3:ArrayBuffer(0, 0, 0, 6) 

這與RDD的foreach方法有關。此方法正在增加數組元素的值。

我已將結果作爲評論發佈在代碼中。我已將結果發佈在代碼中,作爲每個打印語句下面的註釋。請幫助我理解緩存操作。謝謝

package com.examples 

import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 

object ReCalculationModified { 
    def main(args: Array[String]) { 
    val conf = new SparkConf().setMaster("local[1]").setAppName("RecalculationModified") 
    val sc = new SparkContext(conf) 

    val vertex = Array(
    (0, Array(0, 0, 0, 0)), 
    (1, Array(0, 0, 0, 0)), 
    (2, Array(0, 0, 0, 0)), 
    (3, Array(0, 0, 0, 0))) 

    val vertexRDD = sc.makeRDD(vertex).map(x => x).cache() 
    val newVert1 = vertexRDD.map { 
     case (vid, array) => 
     array(vid) += 1 
     (vid, array) 
    } 
    println("--------------newVertex1-------------") 
    newVert1.foreach(x => println(x._1 + ":" + x._2.toBuffer)) 
    println("--------------VertexRDD-------------") 
    vertexRDD.foreach(x => println(x._1 + ":" + x._2.toBuffer)) 

    // Output of both vertexRDD and newVert1 is 
    //  0:ArrayBuffer(1, 0, 0, 0) 
    //  1:ArrayBuffer(0, 1, 0, 0) 
    //  2:ArrayBuffer(0, 0, 1, 0) 
    //  3:ArrayBuffer(0, 0, 0, 1) 

    val newVert2 = newVert1.map { 
     case (vid, array) => 
     array(vid) += 1 
     (vid, array) 
    } 
    println("--------------newVertex2-------------") 
    newVert2.foreach(x => println(x._1 + ":" + x._2.toBuffer)) 
    println("--------------VertexRDD-------------") 
    vertexRDD.foreach(x => println(x._1 + ":" + x._2.toBuffer)) 

    // Output of both vertexRDD and newVert2 is 
    //  0:ArrayBuffer(3, 0, 0, 0) 
    //  1:ArrayBuffer(0, 3, 0, 0) 
    //  2:ArrayBuffer(0, 0, 3, 0) 
    //  3:ArrayBuffer(0, 0, 0, 3) 

    val newVert3 = newVert2.map { 
     case (vid, array) => 
     array(vid) += 1 
     (vid, array) 
    } 
    println("--------------newVertex3-------------") 
    newVert3.foreach(x => println(x._1 + ":" + x._2.toBuffer)) 
    println("--------------VertexRDD-------------") 
    vertexRDD.foreach(x => println(x._1 + ":" + x._2.toBuffer)) 

    // Output of both vertexRDD and newVert3 is 
    //  0:ArrayBuffer(6, 0, 0, 0) 
    //  1:ArrayBuffer(0, 6, 0, 0) 
    //  2:ArrayBuffer(0, 0, 6, 0) 
    //  3:ArrayBuffer(0, 0, 0, 6) 

     val newVert4 = newVert3.map { 
     case (vid, array) => 
     array(vid) += 1 
     (vid, array) 
    } 
    println("--------------newVertex4-------------") 
    newVert4.foreach(x => println(x._1 + ":" + x._2.toBuffer)) 
    println("--------------VertexRDD-------------") 
    vertexRDD.foreach(x => println(x._1 + ":" + x._2.toBuffer)) 

    // Output of both vertexRDD and newVert4 is 
    //  0:ArrayBuffer(10, 0, 0, 0) 
    //  1:ArrayBuffer(0, 10, 0, 0) 
    //  2:ArrayBuffer(0, 0, 10, 0) 
    //  3:ArrayBuffer(0, 0, 0, 10) 

    val newVert5 = newVert4.map { 
     case (vid, array) => 
     array(vid) += 1 
     (vid, array) 
    } 
    println("--------------newVertex5-------------") 
    newVert5.foreach(x => println(x._1 + ":" + x._2.toBuffer)) 
    println("--------------VertexRDD-------------") 
    vertexRDD.foreach(x => println(x._1 + ":" + x._2.toBuffer)) 

    // Output of both vertexRDD and newVert5 is 
    //  0:ArrayBuffer(15, 0, 0, 0) 
    //  1:ArrayBuffer(0, 15, 0, 0) 
    //  2:ArrayBuffer(0, 0, 15, 0) 
    //  3:ArrayBuffer(0, 0, 0, 15) 

    } 
} 
+0

_「在不調用緩存方法,結果如預期的和可以理解,但是當父RDD緩存,我不能瞭解結果。「_你能解釋一下你的意思嗎?謝謝! –

+0

你可以看看這些有類似問題的討論鏈接:https://stackoverflow.com/questions/34513894/apache-sparks-rddvector-immutability-issue,https://community.hortonworks.com/questions /18708/are-spark-rdd-really-mutable.html。 – Shaido

+0

@Jacek - 我的意思是數組中的值爲每個子RDD增加1。下面是來自newVert1 RDD 0:ArrayBuffer(1,0,0,0) 1:ArrayBuffer(0,1,0,0) 2:ArrayBuffer(0,0,0,0) 3:ArrayBuffer(0 ,0,0,1) – Kannan

回答

0

由於使用了您使用過的數組,因此值正在更新。數組在Scala中是可變的。請參閱下面的討論。

Why no immutable arrays in scala standard library?

此外,cache是Spark的變換,其在存儲器中緩存的RDD。緩存只能避免從開始運行DAG。使用緩存時,處理的數據將從內存中挑選出來,而不是通過整個DAG並計算相同。

緩存是星火的,使得它的10倍到100倍比的MapReduce快的特點之一

+0

這很好。我瞭解緩存會保存已處理的數據。這裏,vertexRDD中的數組通過將數組元素遞增1進行變異。接下來vertexRDD的子RDD(newVert1)增加1. newVert1的下一個子RDD(newVert2)增加1,但現在值爲3,當第三個子(newVert3)增加1,結果不同。如何計算增量?我已將結果發佈爲評論。 – Kannan