1
任何人都可以簡單地解釋一下CoGroupedRDD的用途嗎?下面的代碼不兩個RDDSCoGroupedRDD做什麼?
val schema = "some_schema"
val RDD = {sc.cassandraTable[(String, String, Int, Int, Int, Int)](schema, "Event_table").select("column1" as "_1", "column2" as "_2", "column3" as "_3", "column4" as "_4", "column5" as "_5","column6" as "_6").keyBy[Tuple2[Int,Int]]("column5","column6")}
val RDD2 = {sc.cassandraTable[(Int,Int)](ks, "crew_table").select ("crewid_1" as "_1", "crewid_2" as "_2", "crewid_desc").keyBy[Tuple2[Int, Int]]("crewid_1", "crewid_2")}
val joinedRDD = RDD.leftOuterJoin(RDD2)
joinedRDD.take(10).foreach(println)
val RDD3 = {sc.cassandraTable[(Int,String)](ks, "Crew").select ("crewid_1" as "_1", "crewid_2" as "_2").keyBy[Tuple1[Int]]("crew_id")}
val mjoin = joinedRDD.map { x => (x._1._1, x._2) }
val result = mjoin.join(RDD3)
result.toDebugString
res19: String =
(6) MapPartitionsRDD[27] at leftOuterJoin at <console>:66 []
| MapPartitionsRDD[26] at leftOuterJoin at <console>:66 []
| CoGroupedRDD[25] at leftOuterJoin at <console>:66 []
+-(6) MapPartitionsRDD[21] at map at <console>:60 []
| | MapPartitionsRDD[17] at leftOuterJoin at <console>:58 []
| | MapPartitionsRDD[16] at leftOuterJoin at <console>:58 []
| | CoGroupedRDD[15] at leftOuterJoin at <console>:58 []
| +-(6) CassandraTableScanRDD[2] at RDD at CassandraRDD.scala:15 []
| +-(6) CassandraTableScanRDD[5] at RDD at CassandraRDD.scala:15 []
+-(6) CassandraTableScanRDD[11] at RDD at CassandraRDD.scala:15 []