在我的Spark應用程序中,我使用了一個數據量很大的JavaPairRDD<Integer, List<Tuple3<String, String, String>>>
。如何根據密鑰從PairRDD獲取新的RDD
而我的要求是,我需要一些基於密鑰的Large PairRDD的其他RDDs JavaRDD<Tuple3<String, String, String>>
。
在我的Spark應用程序中,我使用了一個數據量很大的JavaPairRDD<Integer, List<Tuple3<String, String, String>>>
。如何根據密鑰從PairRDD獲取新的RDD
而我的要求是,我需要一些基於密鑰的Large PairRDD的其他RDDs JavaRDD<Tuple3<String, String, String>>
。
我不知道Java的API,但這裏是你會怎麼做,在斯卡拉(在spark-shell
):
def rddByKey[K: ClassTag, V: ClassTag](rdd: RDD[(K, Seq[V])]) = {
rdd.keys.distinct.collect.map {
key => key -> rdd.filter(_._1 == key).values.flatMap(identity)
}
}
你必須filter
每個鍵與flatMap
扁平化List
秒。我不得不提的是,這不是一個有用的操作。如果您能夠構建原始RDD,那意味着每個List
都足夠小以適應內存。所以我不明白你爲什麼想把它們變成RDD。
也許我誤解了這個問題?讓我知道。 – 2015-04-02 12:13:16
爲什麼不只是過濾base rdd? – Krever 2015-04-02 10:27:16
使用java.util.stream.Stream過濾數據。請看看[鏈接](http://www.journaldev.com/2389/java-8-features-for-developers-lambdas-functional-interface-stream-and-time-api#java-stream-api) – 2015-04-02 10:31:39
在PairRDD中,我使用具有數百萬個Tuple3的List。但是根據Tuple的第三個參數,我只需要該列表中的50個排序記錄。 所以對此,我只是想創建一些新的Rdds,而不是在排序後有元組。 如果有其他方法,請告訴我。 –
Kaushal
2015-04-02 10:37:11