2016-12-20 125 views
1

如何將火花(最大300 MB)中的一個相當小的數據幀轉換爲嵌套地圖以改善晶石的DAG。由於轉換後的值是在自定義估算器的訓練階段創建的,因此我相信此操作將比之後的加入(Spark dynamic DAG is a lot slower and different from hard coded DAG)更快。現在我只想在管道的預測步驟中快速應用它們。將火花數據幀嵌套映射

val inputSmall = Seq(
    ("A", 0.3, "B", 0.25), 
    ("A", 0.3, "g", 0.4), 
    ("d", 0.0, "f", 0.1), 
    ("d", 0.0, "d", 0.7), 
    ("A", 0.3, "d", 0.7), 
    ("d", 0.0, "g", 0.4), 
    ("c", 0.2, "B", 0.25)).toDF("column1", "transformedCol1", "column2", "transformedCol2") 

這給了錯誤的地圖類型

val inputToMap = inputSmall.collect.map(r => Map(inputSmall.columns.zip(r.toSeq):_*)) 

我寧願想是這樣的:

Map[String, Map[String, Double]]("column1" -> Map("A" -> 0.3, "d" -> 0.0, ...), "column2" -> Map("B" -> 0.25), "g" -> 0.4, ...) 

回答

3

編輯:從最終地圖去除聚集操作

如果使用的是星火2+,這裏有一個建議:

val inputToMap = inputSmall.select(
    map($"column1", $"transformedCol1").as("column1"), 
    map($"column2", $"transformedCol2").as("column2") 
) 

val cols = inputToMap.columns 
val localData = inputToMap.collect 

cols.map { colName => 
    colName -> localData.flatMap(_.getAs[Map[String, Double]](colName)).toMap 
}.toMap 
0

我不知道我遵循的動機,但我認爲這是這個轉變可以讓你得到你想要的結果:

// collect from DF (by your assumption - it is small enough) 
val data: Array[Row] = inputSmall.collect() 

// Create the "column pairs" - 
// can be replaced with hard-coded value: List(("column1", "transformedCol1"), ("column2", "transformedCol2")) 
val columnPairs: List[(String, String)] = inputSmall.columns 
    .grouped(2) 
    .collect { case Array(k, v) => (k, v) } 
    .toList 

// for each pair, get data and group it by left-column's value, choosing first match 
val result: Map[String, Map[String, Double]] = columnPairs 
    .map { case (k, v) => k -> data.map(r => (r.getAs[String](k), r.getAs[Double](v))) } 
    .toMap 
    .mapValues(l => l.groupBy(_._1).map { case (c, l2) => l2.head }) 

result.foreach(println) 
// prints: 
// (column1,Map(A -> 0.3, d -> 0.0, c -> 0.2)) 
// (column2,Map(d -> 0.7, g -> 0.4, f -> 0.1, B -> 0.25))