我們使用融合和dcast轉換數據從寬 - >長和長 - >寬格式。 有關更多詳細信息,請參閱http://seananderson.ca/2013/10/19/reshape.html。Spark是否支持融合和dcast
scala或SparkR都沒問題。
我已經通過這blog和scala functions和R API。 我沒有看到做類似工作的功能。
Spark中是否有等價函數?如果不是的話,還有其他方法可以在Spark中做到嗎?
我們使用融合和dcast轉換數據從寬 - >長和長 - >寬格式。 有關更多詳細信息,請參閱http://seananderson.ca/2013/10/19/reshape.html。Spark是否支持融合和dcast
scala或SparkR都沒問題。
我已經通過這blog和scala functions和R API。 我沒有看到做類似工作的功能。
Spark中是否有等價函數?如果不是的話,還有其他方法可以在Spark中做到嗎?
Reshaping Data with Pivot in Spark支持重塑pivot
。據我所知,melt
大致與主數據相反,也稱爲unpivot
。我對Spark
比較陌生。據我所知,我試圖實施融化操作。
def melt(df: DataFrame, columns: List[String]): DataFrame ={
val restOfTheColumns = df.columns.filterNot(columns.contains(_))
val baseDF = df.select(columns.head, columns.tail: _*)
val newStructure =StructType(baseDF.schema.fields ++ List(StructField("variable", StringType, true), StructField("value", StringType, true)))
var newdf = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], newStructure)
for(variableCol <- restOfTheColumns){
val colValues = df.select(variableCol).map(r=> r(0).toString)
val colRdd=baseDF.rdd.zip(colValues).map(tuple => Row.fromSeq(tuple._1.toSeq.:+(variableCol).:+(tuple._2.toString)))
var colDF =sqlContext.createDataFrame(colRdd, newStructure)
newdf =newdf.unionAll(colDF)
}
newdf
}
它做的工作。但我對效率不是很確定。
+-----+---+---+----------+------+
| name|sex|age| street|weight|
+-----+---+---+----------+------+
|Alice| f| 34| somewhere| 70|
| Bob| m| 63| nowhere| -70|
|Alice| f|612|nextstreet| 23|
| Bob| m|612| moon| 8|
+-----+---+---+----------+------+
可作爲
melt(df, List("name", "sex"))
結果如下:
+-----+---+--------+----------+
| name|sex|variable| value|
+-----+---+--------+----------+
|Alice| f| age| 34|
| Bob| m| age| 63|
|Alice| f| age| 612|
| Bob| m| age| 612|
|Alice| f| street| somewhere|
| Bob| m| street| nowhere|
|Alice| f| street|nextstreet|
| Bob| m| street| moon|
|Alice| f| weight| 70|
| Bob| m| weight| -70|
|Alice| f| weight| 23|
| Bob| m| weight| 8|
+-----+---+--------+----------+
我希望它是有用的,感謝您的意見,如果有改進的空間。
這裏的,只是使用的數據集操作(無RDD東西)
case class Melt(meltColumns: String*) extends Transformer{
override def transform(in: Dataset[_]): DataFrame = {
val nonMeltColumns = in.columns.filterNot{ meltColumns.contains }
val newDS = in
.select(nonMeltColumns.head,meltColumns:_*)
.withColumn("variable", functions.lit(nonMeltColumns.head))
.withColumnRenamed(nonMeltColumns.head,"value")
nonMeltColumns.tail
.foldLeft(newDS){ case (acc,col) =>
in
.select(col,meltColumns:_*)
.withColumn("variable", functions.lit(col))
.withColumnRenamed(col,"value")
.union(acc)
}
.select(meltColumns.head,meltColumns.tail ++ List("variable","value") : _*)
}
override def copy(extra: ParamMap): Transformer = defaultCopy(extra)
@DeveloperApi
override def transformSchema(schema: StructType): StructType = ???
override val uid: String = Identifiable.randomUID("Melt")
}
下面是一個使用它
"spark" should "melt a dataset" in {
import spark.implicits._
val schema = StructType(
List(StructField("Melt1",StringType),StructField("Melt2",StringType)) ++
Range(3,10).map{ i => StructField("name_"+i,DoubleType)}.toList)
val ds = Range(1,11)
.map{ i => Row("a" :: "b" :: Range(3,10).map{ j => Math.random() }.toList :_ *)}
.|>{ rows => spark.sparkContext.parallelize(rows) }
.|>{ rdd => spark.createDataFrame(rdd,schema) }
val newDF = ds.transform{ df =>
Melt("Melt1","Melt2").transform(df) }
assert(newDF.count() === 70)
}
測試一個spark.ml.Transformer
|>是scalaZ管道運營商
星火DataFrame有explode
方法,它提供了R melt
的功能。 它適用於Spark 1.6.1的示例:
// input df has columns (anyDim, n1, n2)
case class MNV(measureName: String, measureValue: Integer);
val dfExploded = df.explode(col("n1"), col("n2")) {
case Row(n1: Int, n2: Int) =>
Array(MNV("n1", n1), MNV("n2", n2))
}
// dfExploded has columns (anyDim, n1, n2, measureName, measureValue)
看起來不像它。如果你可以將你的數據放入內存中,可以使用'as.data.frame()'將Spark DataFrame轉換爲本地數據幀,然後重寫它,並將其寫回Spark。 – Thomas
因爲沒有。你需要自己寫。 – eliasah