2016-05-26 108 views
2

我有一個RDD這樣的地圖數組:獲取最大的地圖

Map("id" -> 1, "name" -> "punit") 
Map("id" -> 2, "name" -> "naik", "ph_no" -> 123123) 

現在我的目標是編寫此陣圖的一個CSV文件,該文件看起來像這個:

id,ph_no,name 
1,,punit 
2,123123,naik 

ID 1沒有提供ph_no,這就是爲什麼它在CSV中爲空的原因。所以我想遍歷這個RDD並找到最大尺寸的Map,這樣我就可以通過提取它的關鍵字來命名標題中的所有字段。

斯卡拉而言這將是:

val x = Array(Map("id" -> 1, "name" -> "punit"),Map("id" -> 2, "name" -> "naik", "ph_no" -> 123123)).maxBy(_.size) 

這將正確地給我:

res0: scala.collection.immutable.Map[String,Any] = Map(id -> 2, name -> naik, ph_no -> 123123) 

我該怎麼辦呢?

回答

3

尋找最大尺寸的Map元素可能不夠準確,因爲它們都不具備所有數據(從示例中判斷)。 您可以通過在地圖中對所有不同的鍵進行聯合來獲取標題列表。 類似於:

val rddOfMaps:RDD[Map[String,Any]] = sc.parallelize(Seq(Map("a"->1, "b"->2, "d"->3),Map("a"->2, "c"->4, "e" -> 1))) 
val headers = rddOfMaps.flatMap(entry => entry.keySet).distinct.collect 
val csvData = rddOfMaps.map(entry => header.map(column => entry.get(column).getOrElse("")).mkString(",")) 

// 1,2,,3, 
// 2,,4,,1 
3

您可以使用.max(),指定按地圖大小排序。

scala> val rdd = sc.parallelize(Array(Map("id" -> 1, "name" -> "punit"),Map("id" -> 2, "name" -> "naik", "ph_no" -> 123123))) 
rdd: org.apache.spark.rdd.RDD[scala.collection.immutable.Map[String,Any]] = ParallelCollectionRDD[0] at parallelize at <console>:27 

scala> val maxMap = rdd.max()(Ordering.by(_.size)) 
maxMap: scala.collection.immutable.Map[String,Any] = Map(id -> 2, name -> naik, ph_no -> 123123) 

通過,因爲你與CSV文件的工作方式,你可以通過使用spark-csv感興趣。