2017-04-18 12 views

回答

2

首先,請注意,您不能加入​​。您需要通過使用獲得JavaPairRDD

  • groupBy()(或keyBy()
  • cartesian()
  • [flat]mapToPair()
  • zipWithIndex()(有用的,因爲它增加了索引,這是沒有的)

然後,一旦你有你的l IST,你可以加入他們全部是這樣的:

JavaPairRDD<Integer, String> linesA = sc.parallelizePairs(Arrays.asList(
              new Tuple2<>(1, "a1"), 
              new Tuple2<>(2, "a2"), 
              new Tuple2<>(3, "a3"), 
              new Tuple2<>(4, "a4"))); 
JavaPairRDD<Integer, String> linesB = sc.parallelizePairs(Arrays.asList(
              new Tuple2<>(1, "b1"), 
              new Tuple2<>(5, "b5"), 
              new Tuple2<>(3, "b3"))); 
JavaPairRDD<Integer, String> linesC = sc.parallelizePairs(Arrays.asList(
              new Tuple2<>(1, "c1"), 
              new Tuple2<>(5, "c6"), 
              new Tuple2<>(6, "c3"))); 

// the list of RDDs 
List<JavaPairRDD<Integer, String>> allLines = Arrays.asList(linesA, linesB, linesC); 

// since we probably don't want to modify any of the datasets in the list, we will 
// copy the first one in a separate variable to keep the result 
JavaPairRDD<Integer, String> res = allLines.get(0); 
for (int i = 1; i < allLines.size(); ++i) { // note we skip position 0 ! 
    res = res.join(allLines.get(i)) 
    /*[1]*/ .mapValues(tuple -> tuple._1 + ':' + tuple._2); 
} 

[1]線是重要的,因爲它映射一個

JavaPairRDD<Integer, Tuple2<String,String>>回一個

JavaPairRdd<Integer,String>這使得它與其它連接兼容。

基於chrisw答案,這可能被放入「一線」是這樣的:

JavaPairRDD<Integer, String> res; 
res = allLines.stream() 
       .reduce((rdd1, rdd2) -> rdd1.join(rdd2).mapValues(tup -> tup._1 + ':' + tup._2)) 
       .get(); // get value from Optional<JavaPairRDD> 

最後,對性能的一些想法。在上面的例子中,我使用了字符串連接來將連接的結果減少回相同類型的RDD。如果你有很多RDD,你可以通過使用for loop版本和JavaPairRDD<Integer, StringBuilder> res來加速這一點,在那裏你首先手動加入。如果需要,我會發布更多細節。

1

我不熟悉的JavaRDD類/接口,但也許你可以解決使用高階函數reduce在Java中8這個問題,看https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html

final List<JavaRDD> list = getList(); // where getList is your list implementation containing JavaRDD instances 

// The JavaRDD class provides rdd() to get the RDD 
final JavaRDD rdd = list.stream().map(JavaRDD::rdd).reduce(RDD::join); 

與String類的一個例子是是這樣的: -

Stream.of("foo", "bar", "baz").reduce(String::concat); 

將會產生

foob​​arbaz

+0

我想我承認這個想法,但JavaRDD ::沒有加入。 – SpiderRico

+1

我剛纔看了一下javadoc,如果'rdd()'方法返回RDD,那麼你可以使用map的高階函數,參見ammendment – chrisw