1
我有一個數據幀。Json解析在Spark UDF中拋出意外的輸出
該數據框中所有列的數據類型都是字符串。一些列的是jsonString
+--------+---------+--------------------------+
|event_id|event_key| rights |
+--------+---------+--------------------------+
| 410|(default)|{"conditions":[{"devic...|
+--------+---------+--------------------------+
我想獨自解析jsonString並從取一個值,將其添加爲新列。我正在使用傑克遜解析器來做到這一點。
這裏是「權利」
{
"conditions": [
{
"devices": [
{
"connection": [
"BROADBAND",
"MOBILE"
],
"platform": "IOS",
"type": "MOBILE",
"provider": "TELETV"
},
{
"connection": [
"BROADBAND",
"MOBILE"
],
"platform": "ANDROID",
"type": "MOBILE",
"provider": "TELETV"
},
{
"connection": [
"BROADBAND",
"MOBILE"
],
"platform": "IOS",
"type": "TABLET",
"provider": "TELETV"
},
{
"connection": [
"BROADBAND",
"MOBILE"
],
"platform": "ANDROID",
"type": "TABLET",
"provider": "TELETV"
}
],
"endDateTime": "2017-01-09T22:59:59.000Z",
"inclusiveGeoTerritories": [
"DE",
"IT",
"ZZ"
],
"mediaType": "Linear",
"offers": [
{
"endDateTime": "2017-01-09T22:59:59.000Z",
"isRestartable": true,
"isRecordable": true,
"isCUTVable": false,
"recordingMode": "UNIQUE",
"retentionCUTV": "P7DT2H",
"retentionNPVR": "P2Y6M5DT12H35M30S",
"offerId": "MOTOGP-RACE",
"offerType": "IPPV",
"startDateTime": "2017-01-09T17:00:00.000Z"
}
],
"platformName": "USA",
"startDateTime": "2017-01-09T17:00:00.000Z",
"territory": "USA"
}
]
}
現在我想建立在現有的數據幀新列的值。要添加的新列的名稱是「提供者」
conditions -> devices -> provider
我想在數據框中爲非常行進行此操作。因此,我創建了一個UDF,我路過持有的jsonString到UDF和UDF裏面我想解析JSON字符串,需要 返回一個值的字符串列
我的火花代碼:
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions._
import org.json4s._
import org.json4s.jackson.JsonMethods
import org.json4s.jackson.JsonMethods._
//
some codes to derive base dataframe
//
val fetchProvider_udf = udf(fetchProvider _)
val result = df.withColumn("provider",fetchProvider_udf(col("rights")))
result.select("event_id,"event_key","rights","provider").show(10)
def fetchProvider(jsonStr:String): String = {
val json = JsonMethods.parse(jsonStr)
val providerData = json \\ "conditions" \\"devices" \\ "provider"
compact(render(providerData))
}
另外如果導航鍵不可用,我該如何處理?它會拋出異常嗎?可以說「條件」在那裏,「設備」在那裏,但「提供者」鍵不在json字符串中。那我該如何處理呢?
有人能幫助我
預期輸出:
+--------+---------+-----------------------+-------------+
|event_id|event_key| rights |provider |
+--------+---------+-----------------------+-------------+
| 410|(unknown)|{"conditions":[{"devic...| TELETV |
+--------+---------+-----------------------+-------------+
但我得到下面的輸出
+--------+---------+-----------------------+------------------------------- ------------------------------------------------------+
|event_id|event_key| rights | provider |
+--------+---------+-----------------------+-------------------------- -----------------------------------------------------------+
| 410|(unknown)|{"conditions":[{"devic...| {"provider":"TELETV","provider":"TELETV","provider":"TELETV","provider":"TELETV" } |
+--------+---------+-----------------------+----------------------------- --------------------------------------------------------+
過濾器有什麼理由不使用spark的'get_json_object'? – Mariusz
要求是使用任何scala語法分析器。 –