2017-03-01 23 views
1

任何人都可以簡單地解釋一下CoGroupedRDD的用途嗎?下面的代碼不兩個RDDSCoGroupedRDD做什麼?

val schema = "some_schema" 
val RDD = {sc.cassandraTable[(String, String, Int, Int, Int, Int)](schema, "Event_table").select("column1" as "_1", "column2" as "_2", "column3" as "_3", "column4" as "_4", "column5" as "_5","column6" as "_6").keyBy[Tuple2[Int,Int]]("column5","column6")} 
val RDD2 = {sc.cassandraTable[(Int,Int)](ks, "crew_table").select ("crewid_1" as "_1", "crewid_2" as "_2", "crewid_desc").keyBy[Tuple2[Int, Int]]("crewid_1", "crewid_2")} 
val joinedRDD = RDD.leftOuterJoin(RDD2) 
joinedRDD.take(10).foreach(println) 
val RDD3 = {sc.cassandraTable[(Int,String)](ks, "Crew").select ("crewid_1" as "_1", "crewid_2" as "_2").keyBy[Tuple1[Int]]("crew_id")} 
val mjoin = joinedRDD.map { x => (x._1._1, x._2) } 


val result = mjoin.join(RDD3) 
result.toDebugString 
res19: String = 
(6) MapPartitionsRDD[27] at leftOuterJoin at <console>:66 [] 
| MapPartitionsRDD[26] at leftOuterJoin at <console>:66 [] 
| CoGroupedRDD[25] at leftOuterJoin at <console>:66 [] 
+-(6) MapPartitionsRDD[21] at map at <console>:60 [] 
| | MapPartitionsRDD[17] at leftOuterJoin at <console>:58 [] 
| | MapPartitionsRDD[16] at leftOuterJoin at <console>:58 [] 
| | CoGroupedRDD[15] at leftOuterJoin at <console>:58 [] 
| +-(6) CassandraTableScanRDD[2] at RDD at CassandraRDD.scala:15 [] 
| +-(6) CassandraTableScanRDD[5] at RDD at CassandraRDD.scala:15 [] 
+-(6) CassandraTableScanRDD[11] at RDD at CassandraRDD.scala:15 [] 

回答

0

之間的連接在其最簡單的形式cogroup具有以下特徵:

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] 

其中, 「自我」 是RDD[(K, V)]。把它簡單花費鍵 - 值對和組值的RDDS通過鍵,保持值對於每個源邏輯上分離的:

val rdd1 = sc.parallelize(Seq((1, "foo"), (1, "bar"), (2, "foobar"))) 
val rdd2 = sc.parallelize(Seq((1, 1), (1, 2), (3, 3))) 
rdd1.cogroup(rdd2).collect.foreach(println) 
(1,(CompactBuffer(foo, bar),CompactBuffer(1, 2))) 
(2,(CompactBuffer(foobar),CompactBuffer())) 
(3,(CompactBuffer(),CompactBuffer(3))) 

這種機制被用來實現joins。一旦數據被共分組,您可以將其拼合成

for { lv <- lss; rv <- rvs } yield (key, (lv, rv)) 

來完成內部連接。外部連接按照相同的程序進行,對空序列進行小調整。