2016-01-17 17 views
0

我正在使用Spark API for Java。我有一個JavaPairRDD,其中的密鑰k被壓縮爲一個字節序列。我想將解壓縮函數(我已經寫過)傳遞給KEY(而不是值)。這些密鑰在解壓縮後仍然是唯一的,我希望它們與它們的對應值配對,即將函數傳遞給JavaPairRDD中的KEY <K,V>

一種方法是對myHashMap = myPairRDD.collectAsMap()然後mySet = myHashMap.keySet(),但它不會再平行地完成,並且密鑰將會脫離他們的價值觀。

另一種方法是使用mySingleRDD = myPairRDD.keys()但隨後鍵會從相應的值被分離,訴

有沒有人有一個更好的辦法?

回答

0

這裏是僞代碼。將舊的RDD轉換爲新的RDD,其中新的RDD的密鑰是未壓縮的。

newRDD = oldRdd.map((key, value) => (decompress(key), value)) 
1

RDDS支持兩種類型的操作:轉換,從現有的創建新的數據集,並行動,其上運行的數據集的計算後的值返回驅動程序。

對於所提出的問題,您應該使用mapToPair,這是一種轉換,它通過(解壓縮)函數傳遞每個JavaPairRDD元素並返回一個新的JavaPairRDD。
結果RDD上的每個鍵/值條目類型爲Tuple2 <K, V>
在此,我用Tuple2<Object, Object>以鍵/值,也假設你有解壓()功能鍵:

的Java 8:

JavaPairRDD<Object, Object> result = pairRDD.mapToPair( 
       (Tuple2<Object, Object> pair) -> new Tuple2<Object, Object>(uncompress(pair._1()), pair._2())); 

的Java 6/7: (不能避免非蘭布達地獄......)

javaPair.mapToPair(new PairFunction<Tuple2<Object,Object>, Object, Object>() { 
    @Override 
    public Tuple2<Object, Object> call(Tuple2<Object, Object> pair) throws Exception { 
     return new Tuple2<Object, Object>(uncompress(pair._1()), pair._2()); 
    }});