2016-08-01 73 views
0

我正在開始使用apache spark。 我有一個要求將json日誌轉換爲扁平指標,也可以認爲是一個簡單的csv。從apache spark中的JSON日誌創建聚合指標

例如,

"orderId":1, 
    "orderData": { 
    "customerId": 123, 
    "orders": [ 
    { 
     "itemCount": 2, 
     "items": [ 
     { 
      "quantity": 1, 
      "price": 315 
     }, 
     { 
      "quantity": 2, 
      "price": 300 
     }, 

     ] 
    } 
    ] 
} 

這可以被視爲一個單一的JSON日誌,我打算將它轉換成,

orderId,customerId,totalValue,units 
    1 , 123 , 915 , 3 

我正在經歷sparkSQL文檔,並可以用它來獲得像單值保持「選擇訂單中的orderId,orderData.customerId「,但我不知道如何獲得所有價格和單位的總和。

什麼應該是最好的做法,以完成這個使用Apache的火花?

+0

着,我們不喜歡數據幀DF = sqlContext.read()JSON。 ( 「/路徑/到/文件」)toDF(); df.registerTempTable(「df」); df.printSchema();之後通過sql執行聚合? –

+0

通過SQL,我可以掌握單個元素,但不知道有關orders.items,我如何在此上運行聚合?我認爲它只會作爲一個json值,如果我錯過了某些東西,請糾正我。 – fireants

+0

你可以看看[this](http://xinhstechblog.blogspot.in/2015/06/reading-json-data-in-spark-dataframes.html)&[nested json](http:// xinhstechblog .blogspot.in/2016/05/reading-json-nested-array-in-spark.html) –

回答

1

嘗試:

>>> from pyspark.sql.functions import * 
>>> doc = {"orderData": {"orders": [{"items": [{"quantity": 1, "price": 315}, {"quantity": 2, "price": 300}], "itemCount": 2}], "customerId": 123}, "orderId": 1} 
>>> df = sqlContext.read.json(sc.parallelize([doc])) 
>>> df.select("orderId", "orderData.customerId", explode("orderData.orders").alias("order")) \ 
... .withColumn("item", explode("order.items")) \ 
... .groupBy("orderId", "customerId") \ 
... .agg(sum("item.quantity"), sum(col("item.quantity") * col("item.price"))) 
+0

感謝您的工作邏輯,我會嘗試映射它在Java中,並張貼在這裏爲他人。 – fireants

0

對於誰是尋找上述的Java解決方案的人,請按:

SparkSession spark = SparkSession 
      .builder() 
      .config(conf) 
      .getOrCreate(); 

    SQLContext sqlContext = new SQLContext(spark); 

    Dataset<Row> orders = sqlContext.read().json("order.json"); 
    Dataset<Row> newOrders = orders.select(
      col("orderId"), 
      col("orderData.customerId"), 
      explode(col("orderData.orders")).alias("order")) 
      .withColumn("item",explode(col("order.items"))) 
      .groupBy(col("orderId"),col("customerId")) 
      .agg(sum(col("item.quantity")),sum(col("item.price"))); 
    newOrders.show();