2015-12-04 39 views
1

我有3個RDD類似的分區鍵(假設爲idOwner一個String):

JavaPairRDD<PartitionKey, Iterable<Cat>> rddCat 
JavaPairRDD<PartitionKey, Iterable<Dog>> rddDog 
JavaPairRDD<PartitionKey, Iterable<Fish>> rddFish 

如何去預期的解決方案:

JavaPairRDD<PartitionKey, Tuple3<Iterable<Cat>, Iterable<Dog>, Iterable<fish>>> 

我只能勉強做到這一點,如何將三個RDD組合在一起?

失敗1:

rddCat.cogroup(rddDog, rddFish) 
--> FlatMapFunction<Tuple2<PartitionKey, Tuple3<Iterable<Iterable<Cat>>, Iterable<Iterable<Dog>>, Iterable<Iterable<Fish>>>> 

故障2:

JavaPairRDD<PartitionKey, Tuple2<Iterable<Cat>, Iterable<Dog>>> catDogRdd = rddCat.join(rddDog); 
JavaPairRDD<PartitionKey, Tuple2<Tuple2<Iterable<Cat>, Iterable<Dog>>, Iterable<Fish>>> finalRdd = catDogRdd.join(rddFish); 

回答

0

TL;博士使用join,即def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))])。

我使用Scala和以下似乎工作正常。

scala> r2.collect 
res7: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(0, 1)), (3,CompactBuffer(6, 7)), (4,CompactBuffer(8, 9)), (1,CompactBuffer(2, 3)), (2,CompactBuffer(4, 5))) 

scala> r3.collect 
res8: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(0, 1, 2)), (3,CompactBuffer(9)), (1,CompactBuffer(3, 4, 5)), (2,CompactBuffer(6, 7, 8))) 

scala> r5.collect 
res9: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(0, 1, 2, 3, 4)), (1,CompactBuffer(5, 6, 7, 8, 9))) 

scala> r2 join r3 join r5 collect 
res10: Array[(Int, ((Iterable[Int], Iterable[Int]), Iterable[Int]))] = Array((0,((CompactBuffer(0, 1),CompactBuffer(0, 1, 2)),CompactBuffer(0, 1, 2, 3, 4))), (1,((CompactBuffer(2, 3),CompactBuffer(3, 4, 5)),CompactBuffer(5, 6, 7, 8, 9)))) 

諮詢org.apache.spark.rdd.PairRDDFunctions

+0

謝謝你,但是如果我使用我加盟只能管理有:'JavaPairRDD ,可迭代 >> catDogRdd = rddCat.join(rddDog);'然後'JavaPairRDD ,Iterable >,Iterable >> finalRdd = catDogRdd.join(rddFish);' – Fundhor

+0

看起來不錯,有什麼問題? – Reactormonk

+0

@Reactormonk我期望tuple3,而不是tuple2在antoher tuple2中 – Fundhor

0

我設法與番石榴的一點幫助做到這一點:

//given 
    final JavaPairRDD<Character, Iterable<Integer>> rdd1 = ... 
    final JavaPairRDD<Character, Iterable<Integer>> rdd2 = ... 
    final JavaPairRDD<Character, Iterable<Integer>> rdd3 = ... 

    // when 
    final JavaPairRDD<Character, Tuple3<Iterable<Iterable<Integer>>, Iterable<Iterable<Integer>>, Iterable<Iterable<Integer>>>> grouped = rdd1.cogroup(rdd2, rdd3); 
    final JavaPairRDD<Character, Tuple3<Iterable<Integer>, Iterable<Integer>, Iterable<Integer>>> flattened = grouped.mapValues(
      t3 -> new Tuple3<>(Iterables.concat(t3._1()), Iterables.concat(t3._2()), Iterables.concat(t3._3())) 
    ); 

不知@Fundhor你如何管理你的第一個嘗試,獲得這個簽名。這似乎不可能。

相關問題