2017-09-27 21 views
0

我與DataFrames工作的哪些元素都得到了類似的模式:星火 - 遞歸函數爲UDF生成異常

root 
|-- NPAData: struct (nullable = true) 
| |-- NPADetails: struct (nullable = true) 
| | |-- location: string (nullable = true) 
| | |-- manager: string (nullable = true) 
| |-- service: array (nullable = true) 
| | |-- element: struct (containsNull = true) 
| | | |-- serviceName: string (nullable = true) 
| | | |-- serviceCode: string (nullable = true) 
|-- NPAHeader: struct (nullable = true) 
| | |-- npaNumber: string (nullable = true) 
| | |-- date: string (nullable = true) 

在我的數據幀我想組具有相同NPAHeader.code的所有元素,所以做到這一點,我用下面一行:

val groupedNpa = orderedNpa.groupBy($"NPAHeader.code").agg(collect_list(struct($"NPAData",$"NPAHeader")).as("npa")) 

我有以下模式的數據幀之後:

StructType(StructField(npaNumber,StringType,true), StructField(npa,ArrayType(StructType(StructField(NPAData...))))) 

每一行的一個例子是類似的東西:

[1234,WrappedArray([npaNew,npaOlder,...npaOldest])] 

現在,我要的是生成與他又拿起另一個數據框只是在WrappedArray的元素之一,所以我想類似的輸出:

[1234,npaNew] 

注意:從WrappedArray中選擇的元素是遍歷整個WrappedArray後匹配完整的邏輯的元素。但爲了簡化這個問題,我會總是拾取WrappedArray的最後一個元素(,在遍歷它後重復)。

要做到這一點,我想定義一個recurside UDF

import org.apache.spark.sql.functions.udf 

def returnRow(elementList : Row)(index:Int): Row = { 
    val dif = elementList.size - index 
    val row :Row = dif match{ 
    case 0 => elementList.getAs[Row](index) 
    case _ => returnRow(elementList)(index + 1) 
    } 
    row 
} 

val returnRow_udf = udf(returnRow _) 


groupedNpa.map{row => (row.getAs[String]("npaNumber"),returnRow_udf(groupedNpa("npa")(0)))} 

但我收到以下錯誤的地圖:

異常線程 「main」 java.lang.UnsupportedOperationException : 類型Int的架構=>不支持單元

我在做什麼錯?

順便說一下,我不確定我是否正確傳遞npagroupedNpa("npa")。我accesing的WrappedArray作爲行,因爲我不知道如何通過Array[Row]迭代(在get(index)方法不存在於數組[行])

回答

1

TL; DR只需使用的中描述的方法之一How to select the first row of each group?

如果你想使用複雜的邏輯,並返回可以跳過SQL API和使用groupByKey

val f: (String, Iterator[org.apache.spark.sql.Row]) => Row 
val encoder: Encoder 
df.groupByKey(_.getAs[String]("NPAHeader.code")).mapGroups(f)(encoder) 

或更好:

val g: (Row, Row) => Row 

df.groupByKey(_.getAs[String]("NPAHeader.code")).reduceGroups(g) 

其中encoder是有效的RowEncoderEncoder error while trying to map dataframe row to updated row)。

你的代碼是在多個方面有缺陷:

  • groupBy不保證值的順序。所以:

    orderBy(...).groupBy(....).agg(collect_list(...)) 
    

    可以有非確定性輸出。如果你真的決定走這條路線,你應該跳過orderBy並明確排列收集的數組。

  • 您無法將咖喱功能傳遞給udf。你必須先解除它,但它需要不同的參數順序(見下面的例子)。

  • 如果你能,這可能是正確的方法來調用它(請注意,你忽略第二個參數):

    returnRow_udf(groupedNpa("npa")(0)) 
    

    更糟糕的是,你怎麼稱呼它裏面map,其中udfs不完全適用。

  • udf不能返回。它必須返回external Scala type

  • array<struct>的外部表示是Seq[Row]。你不能用來代替它。
  • SQL陣列可以通過索引與apply來訪問:

    df.select($"array"(size($"array") - 1)) 
    

    但它不是一個正確的方法由於非確定性。您可以申請sort_array,但正如開頭所指出的那樣,有更有效的解決方案。

  • 令人驚訝的是,遞歸併不那麼相關。你可以設計功能是這樣的:

    def size(i: Int=0)(xs: Seq[Any]): Int = xs match { 
        case Seq() => i 
        case null => i 
        case Seq(h, t @ _*) => size(i + 1)(t) 
    } 
    
    val size_ = udf(size() _) 
    

    ,它會工作得很好:

    Seq((1, Seq("a", "b", "c"))).toDF("id", "array") 
        .select(size_($"array")) 
    

    雖然遞歸是一種矯枉過正,如果你可以遍歷Seq

+0

我想改變我的代碼儘可能地按照您的指示,但我是被迫向上移動到我的一些原始的辦法,因爲在那裏我需要運行這個具有星火1.6,按我的理解服務器groupByKey,mapGroups和reduceGroups,這將很容易我的生活很多,不能在該版本中使用。 –

+0

這是新的情況,如果你想要checl。 https://stackoverflow.com/q/46463931/1773841我做了幾個更新,這就是爲什麼我更願意問一個不同的問題,而不是一次又一次地更新。我在Window()中添加partitionBy和orderBy以避免您指出的問題。我使用了一個「普通」函數,希望能夠從地圖中調用,所以我不會在返回類型中有限制。我知道RDD可以完成這件事,但我對DF並不樂觀。 –