2016-11-11 58 views
0

我有代碼來轉換行數據幀,但我有問題在數組中輸出。如何使用Spark數據幀將行數據幀轉換爲數組Json輸出

輸入:file.txt的

+-------------------------------+--------------------+-------+ 
|id        |var     |score | 
+-------------------------------+--------------------+-------+ 
|12345       |A     |8  | 
|12345       |B     |9  | 
|12345       |C     |7  | 
|12345       |D     |6  | 
+-------------------------------+--------------------+-------+ 

輸出:

{"id":"12345","props":[{"var":"A","score":"8"},{"var":"B","score":"9"},{"var":"C","score":"7"},{"var":"D","score":"6"}]} 

我嘗試使用collect_lis沒有成功。我的代碼是斯卡拉

val sc = new SparkContext(conf); 
val sqlContext = new HiveContext(sc) 

val df = sqlContext.read.json("file.txt") 
val dfCol = df.select(
    df("id"), 
    df("var"), 
    df("score")) 
dfCol.show(false) 

val merge = udf { (var: String, score: Double) => 
     { 
     var + "," + score  } 
    } 

val grouped = dfCol.groupBy(col("id")) 
     .agg(collect_list(merge(col("var"),col("score")).alias("props")) 
grouped.show(false) 

我的問題是,如何將數據行轉換爲輸出數組json?

謝謝。

+0

爲什麼你不嘗試按id編組DF,然後將DF寫入JSON文件本身?我希望應該返回作爲var和道具的數組。 – Shankar

回答

0

奧克,我有我的問題的答案。

  case class Props(var: String, score: Double) 
      case class PropsArray(id: String, props: Seq[Props]) 
      val sc = new SparkContext(conf); 
      val sqlContext = new HiveContext(sc) 

      val df = sqlContext.read.json("file.txt") 
      val dfCol = df.select(
       df("id"), 
       df("var"), 
       df("score")) 


      val merge = udf { (var: String, score: Double) => 
        { 
        var + "," + score  } 
       } 

      val grouped = dfCol.groupBy(col("id")) 
        .agg(concat_ws("|", collect_list(merge(col("var"), col("score")))).alias("props")) 

     val merging = grouped.map(x => { 
       val list: ListBuffer[Props] = ListBuffer() 
       val data = x.getAs[String]("props").split("\\|") 

       data.foreach { x => 
       val arr = x.split(",") 

       try { 

        list.+=:(Props(arr.apply(0).toString(),arr.apply(1).toDouble)) 

       } catch { 
        case t: Throwable => t.getMessage 
       } 

       } 

       PropsArray(x.getAs("id"), list.toSeq) 

      }).toDF() 

可以運行

merging.show(false) 

,你必須在你的pom.xml

<dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-core_2.10</artifactId> 
      <version>1.6.0</version> 
      <exclusions> 
       <exclusion> 
        <artifactId>kryo</artifactId> 
        <groupId>com.esotericsoftware.kryo</groupId> 
       </exclusion> 
      </exclusions> 
     </dependency> 

由於添加庫。

相關問題