2015-09-22 89 views
14

我有一個DataFrame用模式刪除從火花數據幀嵌套柱

root 
|-- label: string (nullable = true) 
|-- features: struct (nullable = true) 
| |-- feat1: string (nullable = true) 
| |-- feat2: string (nullable = true) 
| |-- feat3: string (nullable = true) 

雖然,我能夠過濾使用

val data = rawData 
    .filter(!(rawData("features.feat1") <=> "100")) 

我無法使用丟棄的列中的數據幀

val data = rawData 
     .drop("features.feat1") 

這是我在這裏做錯了嗎?我也嘗試過(不成功)做drop(rawData("features.feat1")),儘管這樣做沒有多大意義。

由於提前,

尼基爾

+0

如果將它映射到一個新的數據框呢?我不認爲DataFrame API允許你在結構列類型中刪除結構字段。 –

+0

哦。我會嘗試的,但如果我必須映射只是爲了解決一個嵌套的列名稱,這樣看起來很不方便:(。 –

+0

你可以隨時使用DataFrame的'.columns()'方法獲得所有列,從序列中刪除不需要的列並執行'select(myColumns:_ *)'。應該稍微短一點。 – Niemand

回答

12

這僅僅是一個編程的工作,但你可以嘗試這樣的事:

import org.apache.spark.sql.{DataFrame, Column} 
import org.apache.spark.sql.types.{StructType, StructField} 
import org.apache.spark.sql.{functions => f} 
import scala.util.Try 

case class DFWithDropFrom(df: DataFrame) { 
    def getSourceField(source: String): Try[StructField] = { 
    Try(df.schema.fields.filter(_.name == source).head) 
    } 

    def getType(sourceField: StructField): Try[StructType] = { 
    Try(sourceField.dataType.asInstanceOf[StructType]) 
    } 

    def genOutputCol(names: Array[String], source: String): Column = { 
    f.struct(names.map(x => f.col(source).getItem(x).alias(x)): _*) 
    } 

    def dropFrom(source: String, toDrop: Array[String]): DataFrame = { 
    getSourceField(source) 
     .flatMap(getType) 
     .map(_.fieldNames.diff(toDrop)) 
     .map(genOutputCol(_, source)) 
     .map(df.withColumn(source, _)) 
     .getOrElse(df) 
    } 
} 

用法示例:

scala> case class features(feat1: String, feat2: String, feat3: String) 
defined class features 

scala> case class record(label: String, features: features) 
defined class record 

scala> val df = sc.parallelize(Seq(record("a_label", features("f1", "f2", "f3")))).toDF 
df: org.apache.spark.sql.DataFrame = [label: string, features: struct<feat1:string,feat2:string,feat3:string>] 

scala> DFWithDropFrom(df).dropFrom("features", Array("feat1")).show 
+-------+--------+ 
| label|features| 
+-------+--------+ 
|a_label| [f2,f3]| 
+-------+--------+ 


scala> DFWithDropFrom(df).dropFrom("foobar", Array("feat1")).show 
+-------+----------+ 
| label| features| 
+-------+----------+ 
|a_label|[f1,f2,f3]| 
+-------+----------+ 


scala> DFWithDropFrom(df).dropFrom("features", Array("foobar")).show 
+-------+----------+ 
| label| features| 
+-------+----------+ 
|a_label|[f1,f2,f3]| 
+-------+----------+ 

添加implicit conversion,你很好走。

9

該版本允許你在任何級別刪除嵌套列:

import org.apache.spark.sql._ 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.types.{StructType, DataType} 

/** 
    * Various Spark utilities and extensions of DataFrame 
    */ 
object DataFrameUtils { 

    private def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = { 
    if (fullColName.equals(dropColName)) { 
     None 
    } else { 
     colType match { 
     case colType: StructType => 
      if (dropColName.startsWith(s"${fullColName}.")) { 
      Some(struct(
       colType.fields 
       .flatMap(f => 
        dropSubColumn(col.getField(f.name), f.dataType, s"${fullColName}.${f.name}", dropColName) match { 
        case Some(x) => Some(x.alias(f.name)) 
        case None => None 
        }) 
       : _*)) 
      } else { 
      Some(col) 
      } 
     case other => Some(col) 
     } 
    } 
    } 

    protected def dropColumn(df: DataFrame, colName: String): DataFrame = { 
    df.schema.fields 
     .flatMap(f => { 
     if (colName.startsWith(s"${f.name}.")) { 
      dropSubColumn(col(f.name), f.dataType, f.name, colName) match { 
      case Some(x) => Some((f.name, x)) 
      case None => None 
      } 
     } else { 
      None 
     } 
     }) 
     .foldLeft(df.drop(colName)) { 
     case (df, (colName, column)) => df.withColumn(colName, column) 
     } 
    } 

    /** 
    * Extended version of DataFrame that allows to operate on nested fields 
    */ 
    implicit class ExtendedDataFrame(df: DataFrame) extends Serializable { 
    /** 
     * Drops nested field from DataFrame 
     * 
     * @param colName Dot-separated nested field name 
     */ 
    def dropNestedColumn(colName: String): DataFrame = { 
     DataFrameUtils.dropColumn(df, colName) 
    } 
    } 
} 

用法:

import DataFrameUtils._ 
df.dropNestedColumn("a.b.c.d") 
+1

非常感謝!你有任何機會更新這個從陣列下的數組結構中刪除一個字段?過了一天的黑客攻擊,關閉但無法得到它。即父:array >>> –

+1

@alexP_Keaton嗨,你有沒有得到一個解決方案,將列放入數組中? –

+0

我想補充說,這種方法不保留修改後的父結構的'可空'屬性。在這個例子中,'features'將變成'struct(nullable = false)' –

2

繼spektom的代碼段階,我已經在Java中創建一個類似的代碼。 由於java 8沒有foldLeft,我用forEachOrdered。此代碼適用於spark 2.x(我正在使用2.1) 另外我還注意到,刪除列並使用withColumn將其添加到相同的名稱不起作用,所以我只是替換列,而且它看起來像上班。

代碼沒有經過充分測試,希望它工作:-)

public class DataFrameUtils { 

public static Dataset<Row> dropNestedColumn(Dataset<Row> dataFrame, String columnName) { 
    final DataFrameFolder dataFrameFolder = new DataFrameFolder(dataFrame); 
    Arrays.stream(dataFrame.schema().fields()) 
     .flatMap(f -> { 
      if (columnName.startsWith(f.name() + ".")) { 
       final Optional<Column> column = dropSubColumn(col(f.name()), f.dataType(), f.name(), columnName); 
       if (column.isPresent()) { 
        return Stream.of(new Tuple2<>(f.name(), column)); 
       } else { 
        return Stream.empty(); 
       } 
      } else { 
       return Stream.empty(); 
      } 
     }).forEachOrdered(colTuple -> dataFrameFolder.accept(colTuple)); 

    return dataFrameFolder.getDF(); 
} 

private static Optional<Column> dropSubColumn(Column col, DataType colType, String fullColumnName, String dropColumnName) { 
    Optional<Column> column = Optional.empty(); 
    if (!fullColumnName.equals(dropColumnName)) { 
     if (colType instanceof StructType) { 
      if (dropColumnName.startsWith(fullColumnName + ".")) { 
       column = Optional.of(struct(getColumns(col, (StructType)colType, fullColumnName, dropColumnName))); 
      } 
     } else { 
      column = Optional.of(col); 
     } 
    } 

    return column; 
} 

private static Column[] getColumns(Column col, StructType colType, String fullColumnName, String dropColumnName) { 
    return Arrays.stream(colType.fields()) 
     .flatMap(f -> { 
        final Optional<Column> column = dropSubColumn(col.getField(f.name()), f.dataType(), 
          fullColumnName + "." + f.name(), dropColumnName); 
        if (column.isPresent()) { 
         return Stream.of(column.get().alias(f.name())); 
        } else { 
         return Stream.empty(); 
        } 
       } 
     ).toArray(Column[]::new); 

} 

private static class DataFrameFolder implements Consumer<Tuple2<String, Optional<Column>>> { 
    private Dataset<Row> df; 

    public DataFrameFolder(Dataset<Row> df) { 
     this.df = df; 
    } 

    public Dataset<Row> getDF() { 
     return df; 
    } 

    @Override 
    public void accept(Tuple2<String, Optional<Column>> colTuple) { 
     if (!colTuple._2().isPresent()) { 
      df = df.drop(colTuple._1()); 
     } else { 
      df = df.withColumn(colTuple._1(), colTuple._2().get()); 
     } 
    } 
} 

用例:

private class Pojo { 
    private String str; 
    private Integer number; 
    private List<String> strList; 
    private Pojo2 pojo2; 

    public String getStr() { 
     return str; 
    } 

    public Integer getNumber() { 
     return number; 
    } 

    public List<String> getStrList() { 
     return strList; 
    } 

    public Pojo2 getPojo2() { 
     return pojo2; 
    } 

} 

private class Pojo2 { 
    private String str; 
    private Integer number; 
    private List<String> strList; 

    public String getStr() { 
     return str; 
    } 

    public Integer getNumber() { 
     return number; 
    } 

    public List<String> getStrList() { 
     return strList; 
    } 

} 

SQLContext context = new SQLContext(new SparkContext("local[1]", "test")); 
Dataset<Row> df = context.createDataFrame(Collections.emptyList(), Pojo.class); 
Dataset<Row> dfRes = DataFrameUtils.dropNestedColumn(df, "pojo2.str"); 

原始結構:

root 
|-- number: integer (nullable = true) 
|-- pojo2: struct (nullable = true) 
| |-- number: integer (nullable = true) 
| |-- str: string (nullable = true) 
| |-- strList: array (nullable = true) 
| | |-- element: string (containsNull = true) 
|-- str: string (nullable = true) 
|-- strList: array (nullable = true) 
| |-- element: string (containsNull = true) 

下降後:

root 
|-- number: integer (nullable = true) 
|-- pojo2: struct (nullable = false) 
| |-- number: integer (nullable = true) 
| |-- strList: array (nullable = true) 
| | |-- element: string (containsNull = true) 
|-- str: string (nullable = true) 
|-- strList: array (nullable = true) 
| |-- element: string (containsNull = true) 
+0

添加一個簡單的示例來說明如何調用它,我將upvote you – xXxpRoGrAmmErxXx

+1

每個@xXxpRoGrAmmErxXx請求添加使用示例 –

1

擴展spektom答案。支持陣列類型:

object DataFrameUtils { 

    private def dropSubColumn(col: Column, colType: DataType, fullColName: String, dropColName: String): Option[Column] = { 
    if (fullColName.equals(dropColName)) { 
     None 
    } else if (dropColName.startsWith(s"$fullColName.")) { 
     colType match { 
     case colType: StructType => 
      Some(struct(
      colType.fields 
       .flatMap(f => 
       dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match { 
        case Some(x) => Some(x.alias(f.name)) 
        case None => None 
       }) 
       : _*)) 
     case colType: ArrayType => 
      colType.elementType match { 
      case innerType: StructType => 
       Some(struct(innerType.fields 
       .flatMap(f => 
        dropSubColumn(col.getField(f.name), f.dataType, s"$fullColName.${f.name}", dropColName) match { 
        case Some(x) => Some(x.alias(f.name)) 
        case None => None 
        }) 
       : _*)) 
      } 

     case other => Some(col) 
     } 
    } else { 
     Some(col) 
    } 
    } 

    protected def dropColumn(df: DataFrame, colName: String): DataFrame = { 
    df.schema.fields 
     .flatMap(f => { 
     if (colName.startsWith(s"${f.name}.")) { 
      dropSubColumn(col(f.name), f.dataType, f.name, colName) match { 
      case Some(x) => Some((f.name, x)) 
      case None => None 
      } 
     } else { 
      None 
     } 
     }) 
     .foldLeft(df.drop(colName)) { 
     case (df, (colName, column)) => df.withColumn(colName, column) 
     } 
    } 

    /** 
    * Extended version of DataFrame that allows to operate on nested fields 
    */ 
    implicit class ExtendedDataFrame(df: DataFrame) extends Serializable { 
    /** 
     * Drops nested field from DataFrame 
     * 
     * @param colName Dot-separated nested field name 
     */ 
    def dropNestedColumn(colName: String): DataFrame = { 
     DataFrameUtils.dropColumn(df, colName) 
    } 
    } 

}