2015-12-18 60 views
4

我有型RDD的4rdds:((INT,INT,INT)值)和我的RDDS是加入多個RDDS

rdd1: ((a,b,c), value) 
rdd2:((a,d,e),valueA) 
rdd3:((f,b,g),valueB) 
rdd4:((h,i,c),valueC) 

如何加入像RDD1集的RDDS上 「一」 RDD1集加入RDD2在「b」上加入rdd2,在「c」上加入rdd1 rdd3

所以在Scala中的輸出是finalRdd: ((a,b,c),valueA,valueB,valueC,value))

我試着用collectAsMap這樣做,但它沒有很好地工作,並拋出異常

代碼只是爲了RDD1集加入RDD2

val newrdd2=rdd2.map{case((a,b,c),d)=>(a,d)}.collectAsMap 
val joined=rdd1.map{case((a,b,c),d)=>(newrdd2.get(a).get,b,c,d)} 

例如

rdd1: ((1,2,3),animals) 
rdd2:((1,anyInt,anyInt),cat) 
rdd3:((anyInt,2,anyInt),cow) 
rdd 4: ((anyInt,anyInt,3),parrot) 

輸出應該是((1,2,3),animals,cat,cow,parrot)

+0

你能否寫一個更好的例子,數據裏面的rdds? –

+0

我添加了一個例子,它不關心anyInt字段上的數字是什麼 – luis

+1

rdd1中是否有重複的行?重複密鑰? (例如,具有「(1,2,3)」和值「動物」和「另一種動物」的兩個元素) –

回答

1

有一個方便的join方法,但您需要通過您特定的連接鍵來鍵入它,這是Spark用於分區和混排的關鍵。

the docs

加入(otherDataset,[numTasks]):當對類型的數據集稱爲(K,V)和(K,W),則返回(K,(V的一個數據集, W))與每個鍵的所有元素對配對。外連接通過leftOuterJoin,rightOuterJoin和fullOuterJoin支持。

我不能編譯我在哪裏,但一方面它是這樣的:

val rdd1KeyA = rdd1.map(x => (x._1._1, (x._1._2, x._1._3. x._2) // RDD(a, (b,c,value)) 
val rdd2KeyA = rdd2.map(x => (x._1._1, x._2) // RDD(a, valueA) 
val joined1 = rdd1KeyA.join(rdd2KeyA) // RDD(a, ((b,c,value), valueA)) 

val rdd3KeyB = rdd3.map(x => (x._1._2, x._2) // RDD(b, valueB) 
val joined1KeyB = joined1.map(x => (x._2._1._1, (x._1, x._2._1._2, x._2._1._3. x._2._2) // RDD(b, (a, c, value, valueA)) 
val joined2 = joined1KeyB.join(rdd3keyB) // RDD(b, ((a, c, value, valueA), valueB)) 

...等等

避免collect*功能,因爲它們不使用您的數據的分佈式特性,並且在大負載時很容易失敗,它們將RDD上的所有數據混洗到主節點上的內存中集合,可能會將所有內容都吹起來。