2016-03-13 53 views
1

我加入了大量的rdd,我想知道是否有刪除每個連接上創建的括號的通用方法。加入RDD後刪除parentesis

這裏是一個小例子:

val rdd1 = sc.parallelize(Array((1,2),(2,4),(3,6))) 
val rdd2 = sc.parallelize(Array((1,7),(2,8),(3,6))) 
val rdd3 = sc.parallelize(Array((1,2),(2,4),(3,6))) 

val result = rdd1.join(rdd2).join(rdd3) 

res: result: org.apache.spark.rdd.RDD[(Int, ((Int, Int), Int))] = Array((1,((2,7),2)), (3,((4,8),4)), (3,((4,8),6)), (3,((4,6),4)), (3,((4,6),6))) 

我知道我可以使用地圖

result.map((x) => (x._1,(x._2._1._1,x._2._1._2,x._2._2))).collect 

Array[(Int, (Int, Int, Int))] = Array((1,(2,7,2)), (2,(4,8,4)), (3,(6,6,6))) 

,但有大量RDD的每一個包含很多元素的它很快變得很難使用這種方法

回答

1

隨着大量的rdd的每個包含很多元素這種方法根本無法工作,因爲最大的buil t-in元組仍然是Tuple22。如果你加入均質RDD某種類型的序列:

def joinAndMerge(rdd1: RDD[(Int, Seq[Int])], rdd2: RDD[(Int, Seq[Int])]) = 
    rdd1.join(rdd2).mapValues{ case (x, y) => x ++ y } 

Seq(rdd1, rdd2, rdd3).map(_.mapValues(Seq(_))).reduce(joinAndMerge) 

如果你只有三個RDDS它可以清潔使用cogroup

rdd1.cogroup(rdd2, rdd3) 
    .flatMapValues { case (xs, ys, zs) => for { 
    x <- xs; y <- ys; z <- zs 
    } yield (x, y, z) } 

如果值是異質它使得使用DataFrames更有意義:

def joinByKey(df1: DataFrame, df2: DataFrame) = df1.join(df2, Seq("k")) 

Seq(rdd1, rdd2, rdd3).map(_.toDF("k", "v")).reduce(joinByKey)