2015-07-22 82 views
2

後,我映射我RDD到如何使用reduceByKey將值添加到Scala Spark中的Set中?

((_id_1, section_id_1), (_id_1, section_id_2), (_id_2, section_3), (_id_2, section_4)) 

我想reduceByKey

((_id_1, Set(section_id_1, section_id_2), (_id_2, Set(section_3, section_4))) 
val collectionReduce = collection_filtered.map(item => { 
     val extras = item._2.get("extras") 
     var section_id = "" 
     var extras_id = "" 
     if (extras != null) { 
     val extras_parse = extras.asInstanceOf[BSONObject] 
     section_id = extras_parse.get("guid").toString 
     extras_id = extras_parse.get("id").toString 
     } 
     (extras_id, Set {section_id}) 
    }).groupByKey().collect() 

我的輸出

((_id_1, (Set(section_1), Set(section_2))), (_id_2, (Set(section_3), Set(section_4)))) 

我該如何解決呢?

在此先感謝。

回答

3

通過簡單地使用++來組合列表,您可以使用reduceByKey

val rdd = sc.parallelize((1, Set("A")) :: (2, Set("B")) :: (2, Set("C")) :: Nil) 
val reducedRdd = rdd.reduceByKey(_ ++ _) 
reducedRdd.collect() 
// Array((1,Set(A)), (2,Set(B, C))) 

在你的情況:

collection_filtered.map(item => { 
    // ... 
    (extras_id, Set(section_id)) 
}).reduceByKey(_ ++ _).collect() 
2

這裏是groupByKey/mapValues

val rdd = sc.parallelize(List(("_id_1", "section_id_1"), ("_id_1", "section_id_2"), ("_id_2", "section_3"), ("_id_2", "section_4"))) 

rdd.groupByKey().mapValues(v => v.toSet).foreach(println) 

這裏使用combineByKey另一替代(推薦超過groupByKey)的替代:

rdd.combineByKey(
     (value: String) => Set(value), 
     (x: Set[String], value: String) => x + value , 
     (x: Set[String], y:  Set[String]) => (x ++ y) 
    ).foreach(println) 
+0

上面使用reduceByKey和你的方式有什麼不同?哪一個更好?謝謝。 – giaosudau

+2

'groupByKey'在這裏更簡單,因爲您不會聚合(「* reduce *」)具有相同鍵的兩個鍵值對之間的任何信息:只需將這些值連接在一起。使用reduce的實現將做同樣的工作,只是稍微難以閱讀版本。 – huitseeker

+2

您應該比'groupByKey'更優先使用'reduceByKey',看看這個[Spark gitbook](http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html)說明。 –

相關問題