2017-04-23 18 views
1

RDD中的每條記錄都包含一個json。我使用SQLContext從Json的這樣創建一個數據框:如何展開DataFrame中的數組(來自JSON)?

val signalsJsonRdd = sqlContext.jsonRDD(signalsJson) 

下面是架構。 datapayload是一組項目。我想分解項目數組以獲得數據框,其中每行是來自datapayload的項目。我試圖根據this答案做一些事情,但似乎我需要在案例Row(arr:Array [...])聲明中模擬項目的整個結構。我可能錯過了一些東西。

val payloadDfs = signalsJsonRdd.explode($"data.datapayload"){ 
    case org.apache.spark.sql.Row(arr: Array[String]) => arr.map(Tuple1(_)) 
} 

上面的代碼拋出一個scala.MatchError,因爲實際行的類型是從行非常不同(ARR:數組[字符串])。可能有一種簡單的方法來做我想做的事,但我找不到它。請幫忙。

架構下面提供

signalsJsonRdd.printSchema() 

root 
|-- _corrupt_record: string (nullable = true) 
|-- data: struct (nullable = true) 
| |-- dataid: string (nullable = true) 
| |-- datapayload: array (nullable = true) 
| | |-- element: struct (containsNull = true) 
| | | |-- Reading: struct (nullable = true) 
| | | | |-- A2DPActive: boolean (nullable = true) 
| | | | |-- Accuracy: double (nullable = true) 
| | | | |-- Active: boolean (nullable = true) 
| | | | |-- Address: string (nullable = true) 
| | | | |-- Charging: boolean (nullable = true) 
| | | | |-- Connected: boolean (nullable = true) 
| | | | |-- DeviceName: string (nullable = true) 
| | | | |-- Guid: string (nullable = true) 
| | | | |-- HandsFree: boolean (nullable = true) 
| | | | |-- Header: double (nullable = true) 
| | | | |-- Heading: double (nullable = true) 
| | | | |-- Latitude: double (nullable = true) 
| | | | |-- Longitude: double (nullable = true) 
| | | | |-- PositionSource: long (nullable = true) 
| | | | |-- Present: boolean (nullable = true) 
| | | | |-- Radius: double (nullable = true) 
| | | | |-- SSID: string (nullable = true) 
| | | | |-- SSIDLength: long (nullable = true) 
| | | | |-- SpeedInKmh: double (nullable = true) 
| | | | |-- State: string (nullable = true) 
| | | | |-- Time: string (nullable = true) 
| | | | |-- Type: string (nullable = true) 
| | | |-- Time: string (nullable = true) 
| | | |-- Type: string (nullable = true) 
+0

看起來非常類似於http://stackoverflow.com/q/43411832/1305344。 –

回答

2

TL;博士explode功能是你的朋友(或我的最愛flatMap)。

explode函數爲給定數組或地圖列中的每個元素創建一個新行。

類似下面應該工作:

signalsJsonRdd.withColumn("element", explode($"data.datapayload")) 

functions對象。

+0

謝謝,這工作。你能幫我理解函數explode和方法signalsRdd.explode有什麼區別嗎?我正在使用該方法,因爲這是我在每個示例中看到的並且無法使其工作。 –

+1

如你所願... http://stackoverflow.com/q/43582989/1305344 –

相關問題