2016-11-16 68 views
4

我跑了火花外殼以下工作:星火UI DAG階段斷開

val d = sc.parallelize(0 until 1000000).map(i => (i%100000, i)).persist 
d.join(d.reduceByKey(_ + _)).collect 

星火UI顯示三個階段。階段4和5對應於d的計算,並且階段6對應於對collect動作的計算。由於d持續存在,我預計只有兩個階段。然而階段5目前沒有連接到任何其他階段。

Spark UI DAG

所以嘗試沒有堅持用運行相同的計算,以及DAG貌似相同,只是沒有表示RDD的綠點已經堅持。

Spark UI DAG without persist

我期望級11的輸出是連接到平臺12的輸入,但事實並非如此。

看着舞臺描述,階段似乎表明d正在持續,因爲階段5有輸入,但我仍然困惑,爲什麼階段5甚至存在。

Spark UI stages

Spark UI stages without persist

回答

1
  1. 輸入RDD緩存和緩存部分不重新計算。

    這可以用一個簡單的測試來驗證:

    import org.apache.spark.SparkContext 
    
    def f(sc: SparkContext) = { 
        val counter = sc.longAccumulator("counter") 
        val rdd = sc.parallelize(0 until 100).map(i => { 
        counter.add(1L) 
        (i%10, i) 
        }).persist 
        rdd.join(rdd.reduceByKey(_ + _)).foreach(_ =>()) 
        counter.value 
    } 
    
    assert(f(spark.sparkContext) == 100) 
    
  2. 緩存不會從DAG中刪除階段。

    如果數據被緩存了相應的階段can be marked as skipped但仍然是DAG的一部分。可以使用檢查點截斷譜系,但它不是一回事,也不會從可視化中刪除階段。

  3. 輸入階段包含超過高速緩存的計算。

    Spark階段將可以鏈接而不執行混洗的操作組合在一起。

    雖然輸入階段的一部分被緩存,但不包括準備洗牌文件所需的所有操作。這就是爲什麼你沒有看到跳過的任務。

  4. 其餘(detachment)只是圖形可視化的限制。

  5. 如果首先重新分區的數據:

    import org.apache.spark.HashPartitioner 
    
    val d = sc.parallelize(0 until 1000000) 
        .map(i => (i%100000, i)) 
        .partitionBy(new HashPartitioner(20)) 
    
    d.join(d.reduceByKey(_ + _)).collect 
    

    你會得到DAG你最有可能尋找:

    enter image description here

0

添加到user6910411的詳細解答,RDD直到第一個動作運行並且它計算整個DAG時,由於對RDD的惰性評估,它並沒有被保留在內存中。因此,當您第一次運行collect()時,RDD「d」會第一次被保存在內存中,但沒有任何內容被讀取。如果您第二次運行collect(),則會讀取緩存的RDD。

此外,如果執行對最終的RDD一個toDebugString,它示出了下面的輸出:

scala> d.join(d.reduceByKey(_ + _)).toDebugString 
res5: String = 
(4) MapPartitionsRDD[19] at join at <console>:27 [] 
| MapPartitionsRDD[18] at join at <console>:27 [] 
| CoGroupedRDD[17] at join at <console>:27 [] 
+-(4) MapPartitionsRDD[15] at map at <console>:24 [] 
| | ParallelCollectionRDD[14] at parallelize at <console>:24 [] 
| ShuffledRDD[16] at reduceByKey at <console>:27 [] 
+-(4) MapPartitionsRDD[15] at map at <console>:24 [] 
    | ParallelCollectionRDD[14] at parallelize at <console>:24 [] 

的上面粗略圖形表示可以被示出爲:RDD Stages