2016-02-18 25 views
0

我使用的火花,創造一個「倒排索引」,將映射一個英語令牌回哪裏令牌被發現documentIds。形式鑑於現有數據:Spark:使用groupByKey創建索引以生成排序的不同值列表?

documentId1, token 
documentId2, token 

我想創建密鑰的倒排索引,價值形態:

token, List(documentId1, documentId2, documentId3, ...) 

這裏的值是documentIds列表排序不同(唯一的)。

這裏是我到目前爲止:

// List of (documentId, token) pairs 
var data = Array((100, "spark"), (50, "spark"), (50, "spark"), (1, "apache"), (3, "apache"), (2, "apache")) 
var myrdd = sc.parallelize(data) 
var myrddGrouped = myrdd.map(pair => (pair._2, pair._1)).groupByKey() 
// myrddGrouped: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[226] at groupByKey at <console>:31 

myrddGrouped.foreach(println) 
// (apache,CompactBuffer(1, 3, 2)) 
// (spark,CompactBuffer(100, 50, 50)) 

正如你所看到的,我使用groupByKey(),但結果值是CompactBuffer,而不是一個列表。我怎樣才能對它應用「獨特」和「排序」?

+1

'''myrddGrouped.mapValues(_。toSeq.distinct.sorted)''' – emeth

+0

相反myrdd.map的'(對=>(pair._2,pair._1))'可以簡單地使用Tuple2的'swap'方法是:'myrdd.map(_.swap)' –

回答

4

我會建議彙集成一組,而不是使用groupByKey。這樣重複會聚集過程中被淘汰,那麼你就可以轉換成某種List並應用排序。

使用一些在意見的建議,也許像下面應該工作:

val input = sc.parallelize(Array((100, "spark"), (50, "spark"), (50, "spark"), (1, "apache"), (3, "apache"), (2, "apache"))) 
val setRDD = input.map(_.swap).aggregateByKey(Set[Int]())(_ ++ Set(_), _ ++ _) 
val sortedListRDD = setRDD.mapValues(_.toList.sorted) 

** sortedListRDD.foreach(println) 
** (spark,List(50, 100)) 
** (apache,List(1, 2, 3)) 

小記是因爲它們是不可變的,你應該申報的RDD的爲val。它可以重新分配var用新RDD但因爲你似乎並沒有那樣做,我只想用val的。

0
var data = Array((100, "spark"), (50, "spark"), (50, "spark"), (1, "apache"), (3, "apache"), (2, "apache")) 
var myrdd = sc.parallelize(data) 
var myrddGrouped = myrdd.map(pair => (pair._2, pair._1)).groupByKey().mapValues(_.toSet.toList).collect 

res141: Array[(String, List[Int])] = Array((spark,List(50, 100)), (apache,List(1, 2, 3)))