2015-12-15 45 views
2

我想在scala中完成兩個RDD的聯合。我無法將它們中的任何一個存儲在內存中,因爲它們非常大。兩個RDD元素的聯合

A = {k1->List(A,B,C), k2->List(W,E,Q)} 
B = {k1->List(D,E,F), k2->List(E,U,O)} 

我該怎麼辦A和B的工會獲得

{(A,B,C,D,E,F),(W,E,Q,U,O)} 

感謝, 南希

+0

你能寫出真實的代碼嗎?因爲'.union'完全符合你的要求,我擔心你的RDD與你所描述的不同。 – Markon

+0

我有兩個RDD是這樣的: List(String)>我已經按鍵排序了它們。我想要做的是兩個RDD值的列聯合。 val a3 = a2.map {case(k,v)=> k-> v.map {case(ki,vi)=> vi}}。sortByKey(true)val a31 = a21.map {case(k,v )=> k-> v.map {case(ki,vi)=> ki}}。sortByKey(true)a3.union(a31) – Nancy

+0

請更新您的問題。 – Markon

回答

1

至於我能告訴你只需要一個join

val a = sc.parallelize(Seq(
    ("k1" -> List("A", "B" , "C")), ("k2" -> List("W", "E", "Q")))) 
val b = sc.parallelize(Seq(
    ("k1" -> List("D", "E", "F")), ("k2" -> List("E", "U", "O")))) 

val combined = a.join(b) // Join by key 
    .values // drop keys 
    .map{case (x, y) => x ++ y} // Combine elements 
1

注:本答案與所述問題匹配的4版本。從那以後,問題發生了變化。我沒有刪除了答案,因爲使用zip

約陷阱一些意見

你可以使用zip是:

val rdd1 = sc.parallelize(Seq("A", "B", "C")) 
val rdd2 = sc.parallelize(Seq("D", "E", "F")) 

val zipped = rdd1.zip(rdd2) 

導致

scala>zipped.collect().foreach(println) 
(A,D) 
(B,E) 
(C,F) 
+0

我看起來不像OP想要的東西。不要在Spark中提到'zip'是相當棘手的。 – zero323

+0

@ zero323它與問題的第4版相匹配 - 從那以後問題發生了變化。在哪些情況下'拉鍊'棘手? – Beryllium

+0

「zip」的問題在於它需要相同數量的分區(簡單部分)和每個分區(硬部分)上相同數量的元素。它適用於從同一血統出現的RDD,但總的來說這種方法毫無用處。 – zero323