2017-07-12 76 views

回答

0

Althoug我已經爲PySpark無解,也許它更容易轉化爲蟒蛇這一點。考慮具有模式的數據幀df

root 
|-- employee: struct (nullable = false) 
| |-- name: string (nullable = false) 
| |-- age: integer (nullable = false) 

然後,如果您想要下降name, 可以這樣做:

val fieldsToKeep = df.select($"employee.*").columns 
.filter(_!="name") // the nested column you want to drop 
.map(n => "employee."+n) 

// overwite column with subset of fields 
df 
.withColumn("employee",struct(fieldsToKeep.head,fieldsToKeep.tail:_*)) 
0

,我發現使用pyspark是由第一嵌套柱轉換成JSON,然後解析與過濾掉不需要的列新的嵌套模式中的轉換後的JSON的方法。

假設我有以下的模式,我想從數據框下降dea.b.da.e):

root 
|-- a: struct (nullable = true) 
| |-- b: struct (nullable = true) 
| | |-- c: long (nullable = true) 
| | |-- d: string (nullable = true) 
| |-- e: struct (nullable = true) 
| | |-- f: long (nullable = true) 
| | |-- g: string (nullable = true) 
|-- h: string (nullable = true) 

我用下面的方法:

  1. 創建新模式對於a排除de。一種快速的方法是通過手動選擇您想要的字段(從df.select("a").schema),並使用StructType從選定字段創建新的模式。或者,您可以通過遍歷架構樹編程方式做到這一點,並排除不需要的字段,是這樣的:

    def exclude_nested_field(schema, unwanted_fields, parent=""): 
        new_schema = [] 
    
        for field in schema: 
         full_field_name = field.name 
         if parent: 
          full_field_name = parent + "." + full_field_name 
    
         if full_field_name not in unwanted_fields: 
          if isinstance(field.dataType, StructType): 
           inner_schema = exclude_nested_field(field.dataType, unwanted_fields, full_field_name) 
           new_schema.append(StructField(field.name, inner_schema)) 
          else: 
           new_schema.append(StructField(field.name, field.dataType)) 
    
        return StructType(new_schema) 
    
    new_schema = exclude_nested_field(df.select("a").schema, ["a.b.d", "a.e"]) 
    
  2. 轉換a列JSON:F.to_json("a")

  3. 從第2步解析JSON轉換a列在步驟1中找到新的模式:F.from_json("a_json", new_schema)