2017-03-09 70 views
0

我有一個帶Integer鍵和Integer []值的PairRDD rdd1spark - 如何在另一個RDD的轉換內查找(Java)PairRDD的鍵和值

我也有另一個PairRDD rdd2與Integer鍵和Double值。

鍵中的每個整數AND值rdd1也作爲鍵存在於rdd2中。

我想爲rdd1每對(x, [y1,y2,...,yn])得到x的雙重價值的每個整數y1y2,...,yn所有的雙重價值。

我試圖收集rdd2作爲Map<Integer,Double>map2),但它不適合在內存中,我得到OOM錯誤。我也嘗試加入rdds,但我無法弄清楚如何加入密鑰和值。 的lookup()方法使用rdd1是不允許的。

的我想要的僞代碼如下:

map each (int x, int[] y) in rdd1 to: 
     (x, map2.get(x) + sum(map2.get(yi))) 

每個yiy

我使用Java,但我想在Java和Scala中都存在同樣的問題。

回答

1

根據您想要對丟失的匹配所做的操作(rdd1中存在索引並且rdd2中沒有相應索引的情況),查詢類似於以下內容。

rdd1. 
    // (x, [ y1, ..., yn ]) -> (x, x), (y1, x), ..., (yn, x) 
    flatMap { case (x, ys) => (x :: ys).map((_, x)) }. 
    // (xory, x) -> (xory, (x, rdd2.lookup(xory))) 
    leftOuterJoin(rdd2). 
    // (xory, (x, rdd2.lookup(xory))) -> (x, rdd2.lookup(xory)) 
    map(_._2). 
    // (x, rdd2.lookup(x)), ... -> (x, rdd2.lookup(x) + sum_i(rdd2.lookup(y_i)) 
    reduceByKey{ case (dopt1, dopt2) => (dopt1 ++ dopt2).reduceOption(_ + _) }. 
    // unwrap the option types 
    mapValues(_.getOrElse(0.0)) 
-1
HashMap<Integer, List<Integer>> map = new HashMap<>(); 
    map.put(1,asList(2,3)); 
    map.put(3,asList(4,5)); 

    System.out.println(
      map.entrySet().stream() 
        .flatMap(kv -> 
          Stream.concat(
            Stream.of((double)kv.getKey()), 
            kv.getValue().stream().mapToDouble(x -> Double.valueOf((double)x)).boxed()) 
        ) 
        .collect(Collectors.toList()) 
      ); 

這個怎麼樣? ...應該在一個RDD中爲您提供所有(鍵和值),您可以在第二個RDD中將它們用作鍵。你當然可以改變類型。

相關問題