2016-07-25 28 views
6

我在Scala中使用Spark,並且我的聚合列是匿名的。有沒有一種方便的方法來重命名數據集中的多個列?我想加一個as的模式,但關鍵列是一個結構(由於groupBy操作),我不知道如何定義一個case classStructType在其中。如何命名聚合列?

我試着定義模式如下:

val returnSchema = StructType(StructField("edge", StructType(StructField("src", IntegerType, true), 
                  StructField("dst", IntegerType), true)), 
           StructField("count", LongType, true)) 
edge_count.as[returnSchema] 

但我得到一個編譯錯誤:

Message: <console>:74: error: overloaded method value apply with alternatives: 
    (fields: Array[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and> 
    (fields: java.util.List[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and> 
    (fields: Seq[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType 
cannot be applied to (org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField, Boolean) 
     val returnSchema = StructType(StructField("edge", StructType(StructField("src", IntegerType, true), 
+0

你能告訴我們的代碼?那麼也許我可以制定一個更好的方法? –

+0

假裝你有一個包含三列的數據集。前兩名分組,第三名計數。關鍵是一個元組。我在Spark 1.6.2上。謝謝@AlbertoBonsanto! – Emre

回答

0

我結束了使用alias es與select聲明;例如,

ds.select($"key.src".as[Short], 
      $"key.dst".as[Short], 
      $"sum(count)".alias("count").as[Long]) 

首先,我不得不使用printSchema確定派生列名:

> ds.printSchema 

root 
|-- key: struct (nullable = false) 
| |-- src: short (nullable = false) 
| |-- dst: short (nullable = false) 
|-- sum(count): long (nullable = true) 
6

最好的解決辦法是明確地命名列,例如,

df 
    .groupBy('a, 'b) 
    .agg(
    expr("count(*) as cnt"), 
    expr("sum(x) as x"), 
    expr("sum(y)").as("y") 
) 

如果您正在使用數據集,則必須提供列的類型,例如expr("count(*) as cnt").as[Long]

您可以直接使用DSL,但我經常發現它比簡單的SQL表達式更冗長。

如果要執行批量重命名,請使用Map,然後使用foldLeft數據幀。

+0

這給我一個類型不匹配的錯誤;輸入是一個數據集。 – Emre

+0

這是因爲'expr()'返回'Column'並且你需要在數據集API中有一個'TypedColumn'。我更新了答案以顯示數據集示例。 – Sim