2016-11-26 94 views
1

我有以下代碼。Spark 1.6 scala創建數據行

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) 
val baseDF = sqlContext.read.json(fileFullPath) 

我的json有2個感興趣的領域:ProductId和Quantity。我在找

{ 
    "sales": { 
     "saledate": "17Mar2008", 
     "sale": [{ 
      "productid": 1, 
      "quantity": 10 
     }, { 
      "productid": 2, 
      "quantity": 1 
     }, { 
      "productid": 3, 
      "quantity": 3 
     }, { 
      "productid": 4, 
      "quantity": 5 
     }] 
    } 
} 

我想改變這其中有2列,基於數量的productid和數量,但多行的火花RDD或DF。我想每個數量1。

在上面的例子中,產品1有10行,產品2有1,產品3有3,產品4有5行,共計19行,即#rows = sum(quantity)。

任何幫助表示讚賞。我正在使用spark 1.6.2和scala。

+0

請改變你的問題,目前它是完全不可讀的 –

+0

對不起...第一次發佈在堆棧上..謝謝:@gasparms – SSC

+0

沒問題 - 我寫了,因爲別人可以downvote問題,因爲格式不佳;) –

回答

0

這應該做的事:

import org.apache.spark.sql.functions._ 

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) 
import sqlContext.implicits._ 

val baseDF = sqlContext.read.json(fileFullPath) 
val listFromQuantity = udf { quantity: Int => List.fill(quantity)(quantity) } 

baseDF.select(explode($"sales.sale")).select($"col.productId", explode(listFromQuantity($"col.quantity"))).show() 

將返回:

+---------+--------+ 
|productId|quantity| 
+---------+--------+ 
|  1|  10| 
|  1|  10| 
|  1|  10| 
|  1|  10| 
|  1|  10| 
|  1|  10| 
|  1|  10| 
|  1|  10| 
|  1|  10| 
|  1|  10| 
|  2|  1| 
|  3|  3| 
|  3|  3| 
|  3|  3| 
|  4|  5| 
|  4|  5| 
|  4|  5| 
|  4|  5| 
|  4|  5| 
+---------+--------+ 

如果你想有第二列單數量(如具有價值1代替5)你應該用List.fill(quantity)(1)替換List.fill(quantity)(quantity)

+0

工作就像一個魅力....謝謝soooo多。 .. – SSC