2015-04-02 31 views
2

在我的Spark應用程序中,我使用了一個數據量很大的JavaPairRDD<Integer, List<Tuple3<String, String, String>>>如何根據密鑰從PairRDD獲取新的RDD

而我的要求是,我需要一些基於密鑰的Large PairRDD的其他RDDs JavaRDD<Tuple3<String, String, String>>

+0

爲什麼不只是過濾base rdd? – Krever 2015-04-02 10:27:16

+0

使用java.util.stream.Stream過濾數據。請看看[鏈接](http://www.journaldev.com/2389/java-8-features-for-developers-lambdas-functional-interface-stream-and-time-api#java-stream-api) – 2015-04-02 10:31:39

+0

在PairRDD中,我使用具有數百萬個Tuple3 的List。但是根據Tuple的第三個參數,我只需要該列表中的50個排序記錄。 所以對此,我只是想創建一些新的Rdds,而不是在排序後有元組。 如果有其他方法,請告訴我。 – Kaushal 2015-04-02 10:37:11

回答

3

我不知道Java的API,但這裏是你會怎麼做,在斯卡拉(在spark-shell):

def rddByKey[K: ClassTag, V: ClassTag](rdd: RDD[(K, Seq[V])]) = { 
    rdd.keys.distinct.collect.map { 
    key => key -> rdd.filter(_._1 == key).values.flatMap(identity) 
    } 
} 

你必須filter每個鍵與flatMap扁平化List秒。我不得不提的是,這不是一個有用的操作。如果您能夠構建原始RDD,那意味着每個List都足夠小以適應內存。所以我不明白你爲什麼想把它們變成RDD。

+0

也許我誤解了這個問題?讓我知道。 – 2015-04-02 12:13:16