2017-06-13 49 views
0

我有兩個RDDS用以下結構:Spark中加入兩個RDDS然後消除鍵

rdd1<String, String>: (str01, str12), (str01, str13), (str02, str13), .. 
rdd2<String, Float>: (str01, 0.1), (str02, 0.3), .. 

我想加入這些RDDS有一個新的RDD其中str01,在RDD1集str02被取代的在RDD2值,如下所示:

rdd3<String, Float>: (str12, 0.1), (str13, 0.1), (str13, 0.3) 

然後,我需要通過按鍵來減少這種RDD如下:

rdd4<String, Float>: (str12, 0.1), (str13, 0.1+0.3 = 0.4) 

我TR ied左右外連接,但以RDD結束 任何想法如何解決此問題?

回答

0

這有助於您的問題。

val map1=List("str01" -> "str12", "str01" -> "str13", "str02" -> "str13") 
val map2=List("str01"->0.1, "str02"->0.3) 

val rdd1=sc.parallelize(map1) 
val rdd2=sc.parallelize(map2) 

val joinedrdd = rdd1.join(rdd2).map(x=> x._2) 
val r = joinedrdd.reduceByKey(_+_) 

這RDD r具有結構爲:RDD[(String, Double)]

相關問題