假設,我有在後續的結構線JSON文件中現有的嵌套列的值:PySpark - 添加新的嵌套列或更改
{
"a": 1,
"b": {
"bb1": 1,
"bb2": 2
}
}
我想改變的關鍵bb1
值或添加新密鑰,如:bb3
。 目前,我使用spark.read.json將json文件加載到spark中,作爲DataFrame和df.rdd.map將RDD的每一行映射到dict。然後,更改嵌套鍵值或添加嵌套鍵並將該詞典轉換爲行。最後,將RDD轉換爲DataFrame。 工作流程的工作原理如下:
def map_func(row):
dictionary = row.asDict(True)
adding new key or changing key value
return as_row(dictionary) # as_row convert dict to row recursively
df = spark.read.json("json_file")
df.rdd.map(map_func).toDF().write.json("new_json_file")
這會爲我工作。但我擔心轉換DataFrame - > RDD(行 - >字典 - >行) - > DataFrame會殺死效率。 有沒有其他方法可以滿足這種需求,但不以犧牲效率爲代價?
我使用的最終解決方案是使用withColumn並動態構建b的模式。 首先,我們可以從DF模式得到b_schema
:
b_schema = next(field['type'] for field in df.schema.jsonValue()['fields'] if field['name'] == 'b')
之後,b_schema
是字典,我們可以通過新的字段添加到其中:
b_schema['fields'].append({"metadata":{},"type":"string","name":"bb3","nullable":True})
然後,我們可以把它轉換到StructType由:
new_b = StructType.fromJson(b_schema)
在map_func,我們可以轉換行與dict和填充新的領域:
def map_func(row):
data = row.asDict(True)
data['bb3'] = data['bb1'] + data['bb2']
return data
map_udf = udf(map_func, new_b)
df.withColumn('b', map_udf('b')).collect()
感謝@Mariusz
似乎,map_func會得到一個Row。我如何修改行b。我想將bb1和bb2的總和設置爲新列的值bb3 – ryan
列'b'將由'withColumn('b',something_here)'修改(替換)。在'map_func'裏面你可以訪問'b'的所有變量,例如求和bb1和bb2 udf可以看作如下:'map_udf = udf(lambda數據:{'bb1':data.bb1,'bb2':data。 bb2,'bb3':data.bb1 + data.bb2},new_b)' – Mariusz
謝謝!但是,我們不能硬編碼模式new_b和dict對象,因爲當數據結構發生變化時,我們需要更改代碼?我們可以從列「b」推斷出模式並動態構建字典對象嗎? – ryan