2017-03-29 81 views
0

我使用spark 2.0.1和python-2.7來修改和展開一些嵌套的JSON數據。pyspark中的數據修改

{ 
"created" : '28-12-2001T12:02:01.143', 
"class" : 'Class_A', 
"sub_class": "SubClass_B", 
"properties": { 
    meta : 'some-info', 
    ..., 
    interests : {"key1": "value1", "key2":"value2, ..., "keyN":"valueN"} 
    } 
} 

使用withColumnudf功能我能夠拉平raw_data到數據幀,看起來像如下

--------------------------------------------------------------------- 
| created    | class | sub_class | meta  | interests             | 
--------------------------------------------------------------------- 
|28-12-2001T12:02:01.143 | Class_A | SubClass_B |'some-info' | "{key1: 'value1', 'key2':'value2', ..., 'keyN':'valueN'}" | 
--------------------------------------------------------------------- 

現在我想轉換

原始數據(JSON格式) /根據興趣列將這一行分成多行。我怎樣才能做到這一點?

所需的輸出

--------------------------------------------------------------------- 
| created     | class | sub_class | meta  | key | value | 
--------------------------------------------------------------------- 
| 28-12-2001T12:02:01.143 | Class_A | SubClass_B | 'some-info' | key1 | value1 | 
--------------------------------------------------------------------- 
| 28-12-2001T12:02:01.143 | Class_A | SubClass_B | 'some-info' | key2 | value2 | 
--------------------------------------------------------------------- 
| 28-12-2001T12:02:01.143 | Class_A | SubClass_B | 'some-info' | keyN | valueN | 
--------------------------------------------------------------------- 

謝謝

回答

0

使用爆炸

下面是完整的例子(主要是獲取數據):

import pyspark.sql.functions as sql 
import pandas as pd 
#sc = SparkContext() 
sqlContext = SQLContext(sc) 

s = "28-12-2001T12:02:01.143 | Class_A | SubClass_B |some-info| {'key1': 'value1', 'key2':'value2', 'keyN':'valueN'}" 
data = s.split('|') 
data = data[:-1]+[eval(data[-1])] 
p_df = pd.DataFrame(data).T 
s_df = sqlContext.createDataFrame(p_df,schema= ['created','class','sub_class','meta','intrests']) 

s_df.select(s_df.columns[:-1]+[sql.explode(s_df.intrests).alias("key", "value")]).show()