0

我正在讀取一個目錄中的多個JSON文件;這個JSON在一個數組中有多個項目'cars'。我試圖將項目'car'中的離散值分解併合併到一個數據框中。在Apache Spark中,我如何合併爆炸JSON數組中的多個SQL列?

一個JSON文件看起來像:

{ 
    "cars": { 
     "items": 
      [ 
       { 

        "latitude": 42.0001, 
        "longitude": 19.0001, 
        "name": "Alex" 
       }, 
       { 

        "latitude": 42.0002, 
        "longitude": 19.0002, 
        "name": "Berta" 
       }, 
       { 

        "latitude": 42.0003, 
        "longitude": 19.0003, 
        "name": "Chris" 
       }, 
       { 

        "latitude": 42.0004, 
        "longitude": 19.0004, 
        "name": "Diana" 
       } 
      ] 
    } 
} 

我爆炸和合並的值只是一個數據幀的方法是:

// Read JSON files 
val jsonData = sqlContext.read.json(s"/mnt/$MountName/.") 
// To sqlContext to DataFrame 
val jsonDF = jsonData.toDF() 

/* Approach 1 */ 
// User-defined function to 'zip' two columns 
val zip = udf((xs: Seq[Double], ys: Seq[Double]) => xs.zip(ys)) 
jsonDF.withColumn("vars", explode(zip($"cars.items.latitude", $"cars.items.longitude"))).select($"cars.items.name", $"vars._1".alias("varA"), $"vars._2".alias("varB")) 

/* Apporach 2 */ 
val df = jsonData.select($"cars.items.name", $"cars.items.latitude", $"cars.items.longitude").toDF("name", "latitude", "longitude") 
val df1 = df.select(explode(df("name")).alias("name"), df("latitude"), df("longitude")) 
val df2 = df1.select(df1("name").alias("name"), explode(df1("latitude")).alias("latitude"), df1("longitude")) 
val df3 = df2.select(df2("name"), df2("latitude"), explode(df2("longitude")).alias("longitude")) 

正如你可以看到方法的結果1僅僅是兩個離散「合併」參數的數據幀,如:

+--------------------+---------+---------+ 
|    name|  varA|  varB| 
+--------------------+---------+---------+ 
|[Leo, Britta, Gor...|48.161079|11.556778| 
|[Leo, Britta, Gor...|48.124666|11.617682| 
|[Leo, Britta, Gor...|48.352043|11.788091| 
|[Leo, Britta, Gor...| 48.25184|11.636337| 

對引的結果如下:

+----+---------+---------+ 
|name| latitude|longitude| 
+----+---------+---------+ 
| Leo|48.161079|11.556778| 
| Leo|48.161079|11.617682| 
| Leo|48.161079|11.788091| 
| Leo|48.161079|11.636337| 
| Leo|48.161079|11.560595| 
| Leo|48.161079|11.788632| 

(結果是每個「名稱」與每個與每個「經度」「緯度」的映射)

結果應該是如下:

+--------------------+---------+---------+ 
|    name|  varA|  varB| 
+--------------------+---------+---------+ 
|Leo     |48.161079|11.556778| 
|Britta    |48.124666|11.617682| 
|Gorch    |48.352043|11.788091| 

你知道如何讀取文件,拆分和合並每行只是一個對象的值嗎?

非常感謝您的幫助!

回答

0

爲了得到預期的結果,你可以嘗試以下方法:

// Read JSON files 
val jsonData = sqlContext.read.json(s"/mnt/$MountName/.") 
// To sqlContext to DataFrame 
val jsonDF = jsonData.toDF() 

// Approach 
val df1 = jsonDF.select(explode(df("cars.items")).alias("items")) 
val df2 = df1.select("items.name", "items.latitude", "items.longitude") 

上述方法將提供以下結果:

+-----+--------+---------+ 
| name|latitude|longitude| 
+-----+--------+---------+ 
| Alex| 42.0001| 19.0001| 
|Berta| 42.0002| 19.0002| 
|Chris| 42.0003| 19.0003| 
|Diana| 42.0004| 19.0004| 
+-----+--------+---------+