2017-07-19 114 views
0

火花數據幀甲酸轉換輸入數據幀1火花數據幀從視圖

|------|--------|-----------|---------------|------------- | 
|city product | Jan(sale) | Feb(sale) | Mar(sale)| 
|---------------|------------|--------------|-------------| 
|c1 | p1 | 123  |  22  | 34  | 
|---------------|------------|--------------|-------------| 
|c2 | p2 |  234 |  432  |  43  | 
|---------------|------------|--------------|-------------| 

到輸出dataframe2作爲整個列的轉置和列如下所示。

|city | product | metric_type metric_value| 
--------------------------------------------------- | 
| c1 | p1  |  Jan |  123   | 
---------------------------------------------------- 
| c1 |  p1  |  Feb |  22   | 
----------------------------------------------------- 
| c1 |  p1  |  Mar |  34   | 
| -------------------------------------------------- 
+1

問題是什麼? –

+0

嗨Assaf,將第一個數據框轉換爲第二個數據框,而不使用spark sql – prakash

+0

你是什麼意思,而不使用spark sql? –

回答

1

數據集的解決方案會是什麼樣子這樣的:

case class orig(city: String, product: String, Jan: Int, Feb: Int, Mar: Int) 
case class newOne(city: String, product: String, metric_type: String, metric_value: Int) 
val df = Seq(("c1", "p1", 123, 22, 34), ("c2", "p2", 234, 432, 43)).toDF("city", "product", "Jan", "Feb", "Mar") 
val newDf = df.as[orig].flatMap(v => Seq(newOne(v.city, v.product, "Jan", v.Jan), newOne(v.city, v.product, "Feb", v.Feb), newOne(v.city, v.product, "Mar", v.Mar))) 
newDf.show() 
>>+----+-------+-----------+-----------+ 
>>|city|product|metric_type|metric_value| 
>>+----+-------+-----------+-----------+ 
>>| c1|  p1|  Jan|  123| 
>>| c1|  p1|  Feb|   22| 
>>| c1|  p1|  Mar|   34| 
>>| c2|  p2|  Jan|  234| 
>>| c2|  p2|  Feb|  432| 
>>| c2|  p2|  Mar|   43| 
>>+----+-------+-----------+-----------+ 

使用數據幀API

儘管OP專門詢問沒有spark sql的數據集,但對於查看此問題的其他人,我認爲應該使用數據幀解決方案。

首先,瞭解數據集API是Spark API的一部分非常重要。數據集和數據框是可互換的,實際上數據框只是一個DataSet [Row]。雖然數據集既有「鍵入」也有「無類型」的API,但忽略某些API似乎對我來說是錯誤的。

二,純「打字」選項有侷限性。例如,如果我們有100個月而不是3個月,那麼以上述方式來做就不切實際了。

最後,Spark在使用類型化API時(因爲類型化API對Spark不透明)提供了很多對數據框的優化,因此在很多情況下性能會變差。

我會建議使用下面的數據幀的解決方案:

val df = Seq(("c1", "p1", 123, 22, 34), ("c2", "p2", 234, 432, 43)).toDF("city", "product", "Jan", "Feb", "Mar") 
val months = Seq("Jan", "Feb", "Mar") 
val arrayedDF = df.withColumn("combined", array(months.head, months.tail: _*))_*)).select("city", "product", "combined") 
val explodedDF = arrayedDF.selectExpr("city", "product", "posexplode(combined) as (pos, metricValue)") 
val u = udf((p: Int) => months(p)) 
val targetDF = explodedDF.withColumn("metric_type", u($"pos")).drop("pos") 
targetDF.show() 
>>+----+-------+-----------+-----------+ 
>>|city|product|metricValue|metric_type| 
>>+----+-------+-----------+-----------+ 
>>| c1|  p1|  123|  Jan| 
>>| c1|  p1|   22|  Feb| 
>>| c1|  p1|   34|  Mar| 
>>| c2|  p2|  234|  Jan| 
>>| c2|  p2|  432|  Feb| 
>>| c2|  p2|   43|  Mar| 
>>+----+-------+-----------+-----------+ 

雖然這是長一點,它處理更一般的情況下。

+0

嗨Assaf,df.as [orig]不工作,編譯時錯誤◾無法找到存儲在數據集中的類型的編碼器。通過導入spark.implicits._支持原始類型(Int,String等)和Product類型(case類)。將來的發行版中將添加對序列化其他類型的支持。 沒有足夠的方法參數:(隱式證據$ 2:org.apache.spark.sql.Encoder [orig])org.apache.spark.sql.Dataset [orig]。未指定的值參數證明$ 2。 – prakash

+0

您使用的是什麼火花版本? –

+0

2.1.0 spark版本 – prakash

0

你需要從廣角轉換數據幀長格式(或收集列或unpivot的數據幀),一種選擇是使用flatMap

import org.apache.spark.sql.types._ 
import org.apache.spark.sql.Row 

val df = Seq(("c1", "p1", 123, 22, 34), ("c2", "p2", 234, 432, 43)).toDF("city", "product", "Jan", "Feb", "Mar") 

df.show 
+----+-------+---+---+---+ 
|city|product|Jan|Feb|Mar| 
+----+-------+---+---+---+ 
| c1|  p1|123| 22| 34| 
| c2|  p2|234|432| 43| 
+----+-------+---+---+---+ 

// schema of the result data frame 
val schema = StructType(List(StructField("city", StringType, true), 
          StructField("product", StringType,true), 
          StructField("metric_type", StringType, true), 
          StructField("metric_value", IntegerType, true))) 

val months = List("Jan", "Feb", "Mar")  
val index = List("city", "product") 

// use flatMap to convert each row into three rows 
val rdd = df.rdd.flatMap(
    row => { 
     val index_values = index.map(i => row.getAs[String](i)) 
     months.map(m => Row.fromSeq(index_values ++ List(m, row.getAs[Int](m)))) 
    } 
) 

spark.createDataFrame(rdd, schema).show 
+----+-------+-----------+------------+ 
|city|product|metric_type|metric_value| 
+----+-------+-----------+------------+ 
| c1|  p1|  Jan|   123| 
| c1|  p1|  Feb|   22| 
| c1|  p1|  Mar|   34| 
| c2|  p2|  Jan|   234| 
| c2|  p2|  Feb|   432| 
| c2|  p2|  Mar|   43| 
+----+-------+-----------+------------+ 
只有
+0

嗨Sathiyarajan,感謝您給解決方案,但我需要的解決方案不使用spark sql,我的項目經理只需要使用數據集/數據幀API方法(https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset ),請以其他方式指導我 – prakash

+0

Hi先生Psidom,請解釋它,它是如何工作的 – prakash