2017-08-16 78 views
1

我有我想用用獲取星火數據集嵌套數組的最小值

Dataset<Row> df = spark.read().json(args[0]); 

星火2.2.0和Java API,這是我轉換成一個數據集。然後分析一個JSON服務器的日誌文件,它生成以下模式:

df.printschema(); 

root 
|-- timestamp: long (nullable = true) 
|-- results: struct (nullable = true) 
| |-- entities: array (nullable = true) 
| | |-- element: struct (containsNull = true) 
| | | |-- entity_id: string (nullable = true) 
| | | |-- score: long (nullable = true) 
| | | |-- is_available: boolean (nullable = true) 
| |-- number_of_results: long (nullable = true) 

我想得分最低的實體,這是可用的,所以我會得到一個數據集類似於:

root 
|-- timestamp: long (nullable = true) 
|-- results: struct (nullable = true) 
| |-- entity: struct (containsNull = true) 
| | |-- entity_id: string (nullable = true) 
| | |-- score: long (nullable = true) 
| | |-- is_available: boolean (nullable = true) 

我該如何做這個轉變?

回答

0

你可以適用於你的陣列列一個用戶定義的函數:

// Define the UDF that takes the min of array 
UDF1<Seq<Row>, Row> getElement = seq -> { 
    Row bestRow = null; 
    long bestRowScore = Long.MAX_VALUE; 
    for (Row r : JavaConversions.seqAsJavaList(seq)){ 
     if (r.getBoolean(1) && r.getLong(2)<bestRowScore){ 
      bestRow = r; 
      bestRowScore = r.getLong(2); 
     } 
    } 
    return bestRow; 
}; 

// Define the return type of UDF 
ArrayType arrayType = (ArrayType) df.select(df.col("results.entities")).schema().fields()[0].dataType(); 
DataType elementType = arrayType.elementType(); 

// Register UDF 
sparkSession.udf().register("getElement", getElement, elementType); 

// Apply UDF on dataset 
Dataset<Row> transformedDF = df.select(df.col("timestamp"),functions.callUDF("getElement", df.col("results.entities"))); 
transformedDF.printSchema(); 
0

您可以使用窗口函數(例如行號),以實現這一目標:

df.registerTempTable("df"); 
val minPerEntityDF = spark.sql("SELECT *, row_number() over (partition by entity.entity_id order by score asc) as rn 
FROM df") 
.filter("rn = 1")