2017-05-30 75 views
0

我有下面的Dataframe,並且我想僅使用RDD將其展平。任何人都可以幫忙嗎?Scala/Spark:使用RDD唯一功能壓扁DataFrame

輸入數據幀:

 
    +---------+-------------+-----------------+-----+----------------+------------------------------------------------------+ 
    |TPNB  |unitOfMeasure|locationReference|types|types   |effectiveDateTime          | 
    +---------+-------------+-----------------+-----+----------------+------------------------------------------------------+ 
    |079562193|EA   |0810    |STORE|[SELLABLE, HELD]|[2015-10-09T00:55:23.6345Z, 2015-10-09T00:55:23.6345Z]| 
    +---------+-------------+-----------------+-----+----------------+------------------------------------------------------+ 

輸出:

 
TPNB  unitOfMeasure locationReference types types  effectiveDateTime 
079562193 EA    0810    STORE SELLABLE 2015-10-09T00:55:23.6345Z 
079562193 EA    0810    STORE HELD  2015-10-09T00:55:23.6345Z 

我是想這樣的事情,這犯規似乎是工作。

 
    final_output.map(value=>((value(0),value(1),value(2),value(3)),value(5),value(6))).map{ 
     case(key,value)=>value.map(records=>(key,records)) 
    } 

+0

'final_output.rdd'應該給你rdd數據,你有沒有試過? –

+0

是的,我試過了。它沒有工作 –

+0

當你使用.rdd時,問題是什麼? –

回答

1

這是你在找什麼只RDD。將第5行和第6行轉換爲Map併爲每個行創建一行。

import spark.implicits._ 

    val data = spark.sparkContext 
    .parallelize(
     Seq(
     ("079562193", 
     "EA", 
     "0810", 
     "STORE", 
     Array("SELLABLE", "HELD"), 
     Array("2015-10-09T00:55:23.6345Z", "2015-10-09T00:55:23.6345Z")) 
    )) 

    val result = data 
    .map(row => (row._1, row._2, row._3, row._4, (row._5.zip(row._6).toMap))) 
    .map(r => { 
     r._5.map(v => (r._1, r._2, r._3, r._4, v._1, v._2)) 
    }) 
    .collect() 
    .foreach(println) 

((079562193,EA,0810,STORE,SELLABLE,2015-10-09T00:55:23.6345Z) 
(079562193,EA,0810,STORE,HELD,2015-10-09T00:55:23.6345Z)) 
+0

嗨Sankar,當我試圖將rdd轉換爲數據框並將其應用於數據框時,這不起作用。像這樣STEP1:val dataDF = sqlContext.createDataFrame(data).toDF(「TPNB」,「unitOfMeasure」,「locationReference」,「types」,「state」,「effectiveDateTime」)STEP2:dataDF.rdd.map(row = >(row(0),row(1),row(2),row(3),(row(4).zip(row(5)).toMap)))。flatMap(r => { r。 _5.map(v =>(r._1,r._2,r._3,r._4,v._1,v._2)) }) .collect() .foreach(println) –

+0

Hi Shankar ,這項工作只適用於rdd。當你使用.rdd將其應用於數據框時,它不起作用 –

+0

是的,你問這個問題只適用於RDD,所以這是RDD的解決方案。對於數據幀我們需要不同的解決方案 –

1

要變換使用RDD唯一的功能,你可以做一些類似(通過df.rdd如)您的數據幀轉換爲RDD後,下面的內容:

val rdd = sc.parallelize(Seq(
    ("079562193", "EA", "0810", "STORE", List("SELLABLE", "HELD"), List("2015-10-09T00:55:23.6345Z", "2015-10-09T00:55:23.6345Z")) 
)). 
    map{ case (t, u, l, y, ts, ds) => ((t, u, l, y), (ts, ds)) }. 
    flatMapValues{ case (x, y) => x zip y }. 
    map{ case ((t, u, l, y), (ts, ds)) => Seq(t, u, l, y, ts, ds) } 

rdd.collect.foreach(println) 
List(079562193, EA, 0810, STORE, SELLABLE, 2015-10-09T00:55:23.6345Z) 
List(079562193, EA, 0810, STORE, HELD, 2015-10-09T00:55:23.6345Z) 
+0

嗨,Leo,當我嘗試將rdd轉換爲數據幀並將其應用於數據幀時,他不工作。像這樣STEP1:val dataDF = sqlContext.createDataFrame(data).toDF(「TPNB」,「unitOfMeasure」,「locationReference」,「types」,「state」,「ef fectiveDateTime」)STEP2:dataDF。 rdd.map {case(t,u,l,y,ts,ds)=>((t,u,l,y),(ts,ds))}。 flatMapValues {case(x,y)=> x zip y}。 ((t,u,l,y),(ts,ds))=> Seq(t,u,l,y,ts,ds)} .collect.foreach(println) –

+0

@Rohan Nayak,那是因爲根據您的原始請求,這些都是RDD轉換。 –