可以說我有2 RDDs
:如何星火合併兩個RDDS如果存儲在關鍵值相匹配
rdd1 = [ (key1, value1), (key2, value2), (key3, value3) ]
rdd2 = [ (key4, value4), (key5, value5), (key6, value6) ]
我要合併的RDDS當且僅當儲存在KEY1在RDD1集=價值=在rdd2中存儲在key5中的值。
我該如何去使用Java或Scala在Spark中做這件事?
可以說我有2 RDDs
:如何星火合併兩個RDDS如果存儲在關鍵值相匹配
rdd1 = [ (key1, value1), (key2, value2), (key3, value3) ]
rdd2 = [ (key4, value4), (key5, value5), (key6, value6) ]
我要合併的RDDS當且僅當儲存在KEY1在RDD1集=價值=在rdd2中存儲在key5中的值。
我該如何去使用Java或Scala在Spark中做這件事?
我認爲你正在尋找一個加入。
您需要做的第一件事是將它們映射到PairRDD,並將key1,key2等作爲鍵。本例使用Tuple2輸入:
JavaPairRDD<Integer, String> pairRdd = rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, Integer, String>() {
public Tuple2<Integer, String> call(Tuple2<Integer, String> val) throws Exception {
return new Tuple2<Integer, String>(val._1(), val._2());
}
});
一旦你映射兩個,你只需要通過按鍵來加入他們的行列:
JavaPairRDD<Integer, Tuple2<String, String>> combined = pairRdd.join(pairRdd2);
然後,合併將是這樣的:
[ (key1, (value1, value5)), (key2, (value2, value4)) ]
其中key1 == key5和key2 == key4
我給你scala spark的解決方案如下
scala> val rdd1 = sc.parallelize(List((3,"s"),(2,"df"),(1,"i")))
scala> val rdd2 = sc.parallelize(List((1,"ds"),(2,"h"),(1,"i")))
scala> val swaprdd1=rdd1.map(_.swap)
scala> val swaprdd2=rdd2.map(_.swap)
scala> val intersectrdd = rdd1.intersection(rdd2)
scala> val resultrdd = intersectrdd.map(_.swap)
我希望它對您的解決方案有所幫助:)