2017-01-03 43 views
1

我有下面的代碼與此數問題:我該如何解釋Apache Spark RDD Lineage Graph?

val input1 = rawinput.map(_.split("\t")).map(x=>(x(6).trim(),x)).sortByKey() 
val input2 = input1.map(x=> x._2.mkString("\t")) 
val x0 = input2.map(_.split("\t")).map(x => (x(6),x(0)) 
val x1 = input2.map(_.split("\t")).map(x => (x(6),x(1)) 
val x2 = input2.map(_.split("\t")).map(x => (x(6),x(2)) 
val x3 = input2.map(_.split("\t")).map(x => (x(6),x(3)) 
val x4 = input2.map(_.split("\t")).map(x => (x(6),x(4)) 
val x5 = input2.map(_.split("\t")).map(x => (x(6),x(5)) 
val x6 = input2.map(_.split("\t")).map(x => (x(6),x(6)) 
val x = x0 union x1 union x2 union x3 union x4 union x5 union x6 


<pre> 
**Lineage Graph:** 
(7) UnionRDD[25] at union at rddCustUtil.scala:78 [] 
| UnionRDD[24] at union at rddCustUtil.scala:78 [] 
| UnionRDD[23] at union at rddCustUtil.scala:78 [] 
| UnionRDD[22] at union at rddCustUtil.scala:78 [] 
| UnionRDD[21] at union at rddCustUtil.scala:78 [] 
| UnionRDD[20] at union at rddCustUtil.scala:78 [] 
| MapPartitionsRDD[7] at map at rddCustUtil.scala:43 [] 
| MapPartitionsRDD[6] at map at rddCustUtil.scala:43 [] 
| MapPartitionsRDD[5] at map at rddCustUtil.scala:40 [] 
| ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 [] 
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 [] 
    | MapPartitionsRDD[2] at map at rddCustUtil.scala:38 [] 
    | /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 [] 
    | /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 [] 
| MapPartitionsRDD[9] at map at rddCustUtil.scala:48 [] 
| MapPartitionsRDD[8] at map at rddCustUtil.scala:48 [] 
| MapPartitionsRDD[5] at map at rddCustUtil.scala:40 [] 
| ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 [] 
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 [] 
    | MapPartitionsRDD[2] at map at rddCustUtil.scala:38 [] 
    | /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 [] 
    | /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 [] 
| MapPartitionsRDD[11] at map at rddCustUtil.scala:53 [] 
| MapPartitionsRDD[10] at map at rddCustUtil.scala:53 [] 
| MapPartitionsRDD[5] at map at rddCustUtil.scala:40 [] 
| ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 [] 
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 [] 
    | MapPartitionsRDD[2] at map at rddCustUtil.scala:38 [] 
    | /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 [] 
    | /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 [] 
| MapPartitionsRDD[13] at map at rddCustUtil.scala:58 [] 
| MapPartitionsRDD[12] at map at rddCustUtil.scala:58 [] 
| MapPartitionsRDD[5] at map at rddCustUtil.scala:40 [] 
| ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 [] 
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 [] 
    | MapPartitionsRDD[2] at map at rddCustUtil.scala:38 [] 
    | /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 [] 
    | /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 [] 
| MapPartitionsRDD[15] at map at rddCustUtil.scala:63 [] 
| MapPartitionsRDD[14] at map at rddCustUtil.scala:63 [] 
| MapPartitionsRDD[5] at map at rddCustUtil.scala:40 [] 
| ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 [] 
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 [] 
    | MapPartitionsRDD[2] at map at rddCustUtil.scala:38 [] 
    | /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 [] 
    | /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 [] 
| MapPartitionsRDD[17] at map at rddCustUtil.scala:68 [] 
| MapPartitionsRDD[16] at map at rddCustUtil.scala:68 [] 
| MapPartitionsRDD[5] at map at rddCustUtil.scala:40 [] 
| ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 [] 
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 [] 
    | MapPartitionsRDD[2] at map at rddCustUtil.scala:38 [] 
    | /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 [] 
    | /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 [] 
| MapPartitionsRDD[19] at map at rddCustUtil.scala:73 [] 
| MapPartitionsRDD[18] at map at rddCustUtil.scala:73 [] 
| MapPartitionsRDD[5] at map at rddCustUtil.scala:40 [] 
| ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 [] 
+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 [] 
    | MapPartitionsRDD[2] at map at rddCustUtil.scala:38 [] 
    | /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 [] 
    | /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 [] 
</pre> 
  • 能否請您解釋一下我有多少洗牌階段將被執行,因爲它正顯示出7 ShuffledRDD [4]?
  • 你能否給我詳細的解釋下面的DAG流?
  • 此操作是否昂貴?

回答

2

多少洗牌階段將被執行

事實上,排序數據所需的洗牌發生的7倍,因爲斯巴克的評價是懶惰和運行點播,除非緩存它將針對DAG中需要它的每個分支重新計算。爲了解決這個問題(並以此計算快得多,大概),你可以緩存(或者更一般地說,堅持input2在使用它之前多次:

val input1 = rawinput.map(_.split("\t")).map(x=>(x(6).trim(),x)).sortByKey() 
val input2 = input1.map(x=> x._2.mkString("\t")).cache() 
// continue as before 

能否請您給我在下面詳細解釋DAG流

x_ RDDS中的每一個計算的「分開」,使用以下的計算:

+-(1) MapPartitionsRDD[3] at map at rddCustUtil.scala:38 [] 
    | MapPartitionsRDD[2] at map at rddCustUtil.scala:38 [] 
    | /Data/ MapPartitionsRDD[1] at textFile at rddCustUtil.scala:35 [] 
    | /Data/ HadoopRDD[0] at textFile at rddCustUtil.scala:35 [] 
| MapPartitionsRDD[9] at map at rddCustUtil.scala:48 [] 
| MapPartitionsRDD[8] at map at rddCustUtil.scala:48 [] 
| MapPartitionsRDD[5] at map at rddCustUtil.scala:40 [] 
| ShuffledRDD[4] at sortByKey at rddCustUtil.scala:38 [] 

其中顯示從textFile創建rawinput的計算,然後執行排序操作和三個map操作。

然後,您有6個工會操作unioining這7個RDD。

此操作是否昂貴?

,它似乎是這樣。正如上文所述,緩存可以讓它更快 - 但有一個更好的方式來實現這一目標 - 不分割RDD成許多獨立的人

val x = rawinput.map(_.split("\t")) 
    .keyBy(_(6).trim()) // extract key 
    .flatMap{ case (k, arr) => arr.take(7).zipWithIndex.map((k, _)) } // flatMap into (key, (value, index)) 
    .sortBy { case (k, (_, index)) => (index, k) } // sort by index first, key second 
    .map { case (k, (value, _)) => (k, value) } // remove index, it was just used for sorting 

這將執行單排操作,而不會需要堅持數據。 DAG看起來像這樣:

(4) MapPartitionsRDD[9] at map at Test.scala:75 [] 
| MapPartitionsRDD[8] at sortBy at Test.scala:74 [] 
| ShuffledRDD[7] at sortBy at Test.scala:74 [] 
+-(4) MapPartitionsRDD[4] at sortBy at Test.scala:74 [] 
    | MapPartitionsRDD[3] at flatMap at Test.scala:73 [] 
    | MapPartitionsRDD[2] at keyBy at Test.scala:72 [] 
    | MapPartitionsRDD[1] at map at Test.scala:71 [] 
    | ParallelCollectionRDD[0] at parallelize at Test.scala:64 [] 
+0

非常感謝Tzach,但如果我使用緩存或持久方法,那麼有多少shuffle需要對數據進行排序? – Souvik

+0

一,我相信。打印輸出可能不會反映緩存階段將被_skipped_感謝的事實。 –

+0

嗨Tzach,我有一個問題在下面的staement: – Souvik