2017-09-23 57 views
1

我有一個數據幀。Json解析在Spark UDF中拋出意外的輸出

該數據框中所有列的數據類型都是字符串。一些列的是jsonString

+--------+---------+--------------------------+ 
|event_id|event_key|    rights  | 
+--------+---------+--------------------------+ 
|  410|(default)|{"conditions":[{"devic...| 
+--------+---------+--------------------------+ 

我想獨自解析jsonString並從取一個值,將其添加爲新列。我正在使用傑克遜解析器來做到這一點。

這裏是「權利」

{ 
"conditions": [ 
    { 
     "devices": [ 
      { 
       "connection": [ 
        "BROADBAND", 
        "MOBILE" 
       ], 
       "platform": "IOS", 
       "type": "MOBILE", 
       "provider": "TELETV" 
      }, 
      { 
       "connection": [ 
        "BROADBAND", 
        "MOBILE" 
       ], 
       "platform": "ANDROID", 
       "type": "MOBILE", 
       "provider": "TELETV" 
      }, 
      { 
       "connection": [ 
        "BROADBAND", 
        "MOBILE" 
       ], 
       "platform": "IOS", 
       "type": "TABLET", 
       "provider": "TELETV" 
      }, 
      { 
       "connection": [ 
        "BROADBAND", 
        "MOBILE" 
       ], 
       "platform": "ANDROID", 
       "type": "TABLET", 
       "provider": "TELETV" 
      } 
     ], 
     "endDateTime": "2017-01-09T22:59:59.000Z", 
     "inclusiveGeoTerritories": [ 
      "DE", 
      "IT", 
      "ZZ" 
     ], 
     "mediaType": "Linear", 
     "offers": [ 
      { 
       "endDateTime": "2017-01-09T22:59:59.000Z", 
       "isRestartable": true, 
       "isRecordable": true, 
       "isCUTVable": false, 
       "recordingMode": "UNIQUE", 
       "retentionCUTV": "P7DT2H", 
       "retentionNPVR": "P2Y6M5DT12H35M30S", 
       "offerId": "MOTOGP-RACE", 
       "offerType": "IPPV", 
       "startDateTime": "2017-01-09T17:00:00.000Z" 
      } 
     ], 
     "platformName": "USA", 
     "startDateTime": "2017-01-09T17:00:00.000Z", 
     "territory": "USA" 
    } 
] 
} 

現在我想建立在現有的數據幀新列的值。要添加的新列的名稱是「提供者」

conditions -> devices -> provider 

我想在數據框中爲非常行進行此操作。因此,我創建了一個UDF,我路過持有的jsonString到UDF和UDF裏面我想解析JSON字符串,需要 返回一個值的字符串列

我的火花代碼:

import org.apache.spark.sql.functions.udf 
import org.apache.spark.sql.functions._ 
import org.json4s._ 
import org.json4s.jackson.JsonMethods 
import org.json4s.jackson.JsonMethods._ 


    // 
    some codes to derive base dataframe 
    // 

    val fetchProvider_udf = udf(fetchProvider _) 
    val result = df.withColumn("provider",fetchProvider_udf(col("rights"))) 
    result.select("event_id,"event_key","rights","provider").show(10) 


    def fetchProvider(jsonStr:String): String = { 

    val json = JsonMethods.parse(jsonStr) 

    val providerData = json \\ "conditions" \\"devices" \\ "provider" 

    compact(render(providerData)) 
    } 

另外如果導航鍵不可用,我該如何處理?它會拋出異常嗎?可以說「條件」在那裏,「設備」在那裏,但「提供者」鍵不在json字符串中。那我該如何處理呢?

有人能幫助我

預期輸出:

+--------+---------+-----------------------+-------------+ 
|event_id|event_key|    rights  |provider  | 
+--------+---------+-----------------------+-------------+ 
|  410|(unknown)|{"conditions":[{"devic...| TELETV | 
+--------+---------+-----------------------+-------------+ 

但我得到下面的輸出

+--------+---------+-----------------------+-------------------------------  ------------------------------------------------------+ 
|event_id|event_key|    rights  |              provider  | 
     +--------+---------+-----------------------+--------------------------  -----------------------------------------------------------+ 
|  410|(unknown)|{"conditions":[{"devic...| {"provider":"TELETV","provider":"TELETV","provider":"TELETV","provider":"TELETV"  } | 
    +--------+---------+-----------------------+-----------------------------  --------------------------------------------------------+ 
+1

過濾器有什麼理由不使用spark的'get_json_object'? – Mariusz

+0

要求是使用任何scala語法分析器。 –

回答

0

如果要提取您應該使用下列第一供應商的價值UDF內部代碼:

(json \\ "conditions" \\"devices")[0] \\ "provider" 

當前代碼僅獲取所有提供者(作爲Map),然後將其轉換爲字符串作爲UDF結果。

您還應該確保您的UDF不會引發任何異常(因爲它會導致整個作業失敗)。最簡單的方法是將返回null,則:

如果要調查
  • - 由df.provider.isNull()
  • ,如果你想只保留有效的輸入濾波器 - 通過df.provider.isNullNull()