2016-12-09 21 views
0

我想了解由給定的簡單程序生成的日誌輸出。需要幫助來了解每個步驟或引用這種寫法都可以。我想了解給定的日誌生成的Spark程序

命令 sc.parallelize(陣列(( 「A」,1),( 「B」,1),( 「A」,1),( 「A」,1),( 「b」 的,1),(「b」,1),(「b」,1),(「b」,1)),3).map(a => a).reduceByKey(_ + _).collect()

輸出:

16/12/08 23:41:57 INFO spark.SparkContext: Starting job: collect at <console>:28 
16/12/08 23:41:57 INFO scheduler.DAGScheduler: Registering RDD 1 (map at <console>:28) 
16/12/08 23:41:57 INFO scheduler.DAGScheduler: Got job 0 (collect at <console>:28) with 3 output partitions 
16/12/08 23:41:57 INFO scheduler.DAGScheduler: Final stage: ResultStage 1 (collect at <console>:28) 
16/12/08 23:41:57 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 0) 
16/12/08 23:41:57 INFO scheduler.DAGScheduler: Missing parents: List(ShuffleMapStage 0) 
16/12/08 23:41:57 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[1] at map at <console>:28), which has no missing parents 
16/12/08 23:41:57 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.6 KB, free 2.6 KB) 
16/12/08 23:41:57 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1588.0 B, free 4.2 KB) 
16/12/08 23:41:57 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.17.0.6:31122 (size: 1588.0 B, free: 511.5 MB) 
16/12/08 23:41:57 INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006 
16/12/08 23:41:57 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[1] at map at <console>:28) 
16/12/08 23:41:57 INFO cluster.YarnScheduler: Adding task set 0.0 with 3 tasks 
16/12/08 23:41:57 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 34b943b3f6ea, partition 0,PROCESS_LOCAL, 2183 bytes) 
16/12/08 23:41:57 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 34b943b3f6ea, partition 1,PROCESS_LOCAL, 2199 bytes) 
16/12/08 23:41:57 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 34b943b3f6ea:28772 (size: 1588.0 B, free: 511.5 MB) 
16/12/08 23:41:57 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 34b943b3f6ea:39570 (size: 1588.0 B, free: 511.5 MB) 
16/12/08 23:41:58 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, 34b943b3f6ea, partition 2,PROCESS_LOCAL, 2200 bytes) 
16/12/08 23:41:58 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 740 ms on 34b943b3f6ea (1/3) 
16/12/08 23:41:58 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 778 ms on 34b943b3f6ea (2/3) 
16/12/08 23:41:58 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 66 ms on 34b943b3f6ea (3/3) 
16/12/08 23:41:58 INFO scheduler.DAGScheduler: ShuffleMapStage 0 (map at <console>:28) finished in 0.792 s 
16/12/08 23:41:58 INFO scheduler.DAGScheduler: looking for newly runnable stages 
16/12/08 23:41:58 INFO scheduler.DAGScheduler: running: Set() 
16/12/08 23:41:58 INFO scheduler.DAGScheduler: waiting: Set(ResultStage 1) 
16/12/08 23:41:58 INFO scheduler.DAGScheduler: failed: Set() 
16/12/08 23:41:58 INFO scheduler.DAGScheduler: Submitting ResultStage 1 (ShuffledRDD[2] at reduceByKey at <console>:28), which has no missing parents 
16/12/08 23:41:58 INFO cluster.YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 
16/12/08 23:41:58 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.6 KB, free 6.7 KB) 
16/12/08 23:41:58 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1589.0 B, free 8.3 KB) 
16/12/08 23:41:58 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 172.17.0.6:31122 (size: 1589.0 B, free: 511.5 MB) 
16/12/08 23:41:58 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006 
16/12/08 23:41:58 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from ResultStage 1 (ShuffledRDD[2] at reduceByKey at <console>:28) 
16/12/08 23:41:58 INFO cluster.YarnScheduler: Adding task set 1.0 with 3 tasks 
16/12/08 23:41:58 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, 34b943b3f6ea, partition 1,NODE_LOCAL, 1894 bytes) 
16/12/08 23:41:58 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 1.0 (TID 4, 34b943b3f6ea, partition 2,NODE_LOCAL, 1894 bytes) 
16/12/08 23:41:58 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 34b943b3f6ea:39570 (size: 1589.0 B, free: 511.5 MB) 
16/12/08 23:41:58 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 34b943b3f6ea:28772 (size: 1589.0 B, free: 511.5 MB) 
16/12/08 23:41:58 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 34b943b3f6ea:60986 
16/12/08 23:41:58 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 163 bytes 
16/12/08 23:41:58 INFO spark.MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to 34b943b3f6ea:60984 
16/12/08 23:41:58 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 5, 34b943b3f6ea, partition 0,PROCESS_LOCAL, 1894 bytes) 
16/12/08 23:41:58 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 1.0 (TID 4) in 331 ms on 34b943b3f6ea (1/3) 
16/12/08 23:41:58 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 351 ms on 34b943b3f6ea (2/3) 
16/12/08 23:41:58 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 5) in 29 ms on 34b943b3f6ea (3/3) 
16/12/08 23:41:58 INFO scheduler.DAGScheduler: ResultStage 1 (collect at <console>:28) finished in 0.359 s 
16/12/08 23:41:58 INFO cluster.YarnScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool 
16/12/08 23:41:58 INFO scheduler.DAGScheduler: Job 0 finished: collect at <console>:28, took 1.381102 s 
res14: Array[(String, Int)] = Array((a,3), (b,5)) 

回答

0
  1. 正如你所看到的,處理開始收集() - 這是一個火花使用延遲初始化。即使您使用了map和reduceByKey,該流程也會在collect中啓動。隨着地圖和reduceByKey是轉變

  2. 你可以看到3個分區,每個都具有一個任務 - 因爲你有3個分區

  3. 初始化RDD的另一點是如何每個地圖和reduceByKey的處理數據局部性。 map中的所有三個任務都有PROCESS_LOCAL。 reduceByKey需要數據洗牌,因此您可能有PROCESS_LOCAL和NODE_LOCAL。