2016-04-07 89 views
4

我們使用融合和dcast轉換數據從寬 - >長和長 - >寬格式。 有關更多詳細信息,請參閱http://seananderson.ca/2013/10/19/reshape.htmlSpark是否支持融合和dcast

scala或SparkR都沒問題。

我已經通過這blogscala functionsR API。 我沒有看到做類似工作的功能。

Spark中是否有等價函數?如果不是的話,還有其他方法可以在Spark中做到嗎?

+0

看起來不像它。如果你可以將你的數據放入內存中,可以使用'as.data.frame()'將Spark DataFrame轉換爲本地數據幀,然後重寫它,並將其寫回Spark。 – Thomas

+0

因爲沒有。你需要自己寫。 – eliasah

回答

10

Reshaping Data with Pivot in Spark支持重塑pivot。據我所知,melt大致與主數據相反,也稱爲unpivot。我對Spark比較陌生。據我所知,我試圖實施融化操作。

def melt(df: DataFrame, columns: List[String]): DataFrame ={ 

    val restOfTheColumns = df.columns.filterNot(columns.contains(_)) 
    val baseDF = df.select(columns.head, columns.tail: _*) 
    val newStructure =StructType(baseDF.schema.fields ++ List(StructField("variable", StringType, true), StructField("value", StringType, true))) 
    var newdf = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], newStructure) 

    for(variableCol <- restOfTheColumns){ 
     val colValues = df.select(variableCol).map(r=> r(0).toString) 
     val colRdd=baseDF.rdd.zip(colValues).map(tuple => Row.fromSeq(tuple._1.toSeq.:+(variableCol).:+(tuple._2.toString))) 
     var colDF =sqlContext.createDataFrame(colRdd, newStructure) 
     newdf =newdf.unionAll(colDF) 
    } 
    newdf 
    } 

它做的工作。但我對效率不是很確定。

+-----+---+---+----------+------+ 
| name|sex|age| street|weight| 
+-----+---+---+----------+------+ 
|Alice| f| 34| somewhere| 70| 
| Bob| m| 63| nowhere| -70| 
|Alice| f|612|nextstreet| 23| 
| Bob| m|612|  moon|  8| 
+-----+---+---+----------+------+ 

可作爲

melt(df, List("name", "sex")) 

結果如下:

+-----+---+--------+----------+ 
| name|sex|variable|  value| 
+-----+---+--------+----------+ 
|Alice| f|  age|  34| 
| Bob| m|  age|  63| 
|Alice| f|  age|  612| 
| Bob| m|  age|  612| 
|Alice| f| street| somewhere| 
| Bob| m| street| nowhere| 
|Alice| f| street|nextstreet| 
| Bob| m| street|  moon| 
|Alice| f| weight|  70| 
| Bob| m| weight|  -70| 
|Alice| f| weight|  23| 
| Bob| m| weight|   8| 
+-----+---+--------+----------+ 

我希望它是有用的,感謝您的意見,如果有改進的空間。

0

這裏的,只是使用的數據集操作(無RDD東西)

case class Melt(meltColumns: String*) extends Transformer{ 

    override def transform(in: Dataset[_]): DataFrame = { 
    val nonMeltColumns = in.columns.filterNot{ meltColumns.contains } 
    val newDS = in 
     .select(nonMeltColumns.head,meltColumns:_*) 
     .withColumn("variable", functions.lit(nonMeltColumns.head)) 
     .withColumnRenamed(nonMeltColumns.head,"value") 

    nonMeltColumns.tail 
     .foldLeft(newDS){ case (acc,col) => 
     in 
      .select(col,meltColumns:_*) 
      .withColumn("variable", functions.lit(col)) 
      .withColumnRenamed(col,"value") 
      .union(acc) 
     } 
     .select(meltColumns.head,meltColumns.tail ++ List("variable","value") : _*) 
    } 

    override def copy(extra: ParamMap): Transformer = defaultCopy(extra) 

    @DeveloperApi 
    override def transformSchema(schema: StructType): StructType = ??? 

    override val uid: String = Identifiable.randomUID("Melt") 
} 

下面是一個使用它

"spark" should "melt a dataset" in { 
    import spark.implicits._ 
    val schema = StructType(
     List(StructField("Melt1",StringType),StructField("Melt2",StringType)) ++ 
     Range(3,10).map{ i => StructField("name_"+i,DoubleType)}.toList) 

    val ds = Range(1,11) 
     .map{ i => Row("a" :: "b" :: Range(3,10).map{ j => Math.random() }.toList :_ *)} 
     .|>{ rows => spark.sparkContext.parallelize(rows) } 
     .|>{ rdd => spark.createDataFrame(rdd,schema) } 

    val newDF = ds.transform{ df => 
     Melt("Melt1","Melt2").transform(df) } 

    assert(newDF.count() === 70) 
    } 

測試一個spark.ml.Transformer |>是scalaZ管道運營商

0

星火DataFrame有explode方法,它提供了R melt的功能。 它適用於Spark 1.6.1的示例:

// input df has columns (anyDim, n1, n2) 
case class MNV(measureName: String, measureValue: Integer); 
val dfExploded = df.explode(col("n1"), col("n2")) { 
    case Row(n1: Int, n2: Int) => 
    Array(MNV("n1", n1), MNV("n2", n2)) 
} 
// dfExploded has columns (anyDim, n1, n2, measureName, measureValue)