我與DataFrames工作的哪些元素都得到了類似的模式:星火 - 遞歸函數爲UDF生成異常
root
|-- NPAData: struct (nullable = true)
| |-- NPADetails: struct (nullable = true)
| | |-- location: string (nullable = true)
| | |-- manager: string (nullable = true)
| |-- service: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- serviceName: string (nullable = true)
| | | |-- serviceCode: string (nullable = true)
|-- NPAHeader: struct (nullable = true)
| | |-- npaNumber: string (nullable = true)
| | |-- date: string (nullable = true)
在我的數據幀我想組具有相同NPAHeader.code
的所有元素,所以做到這一點,我用下面一行:
val groupedNpa = orderedNpa.groupBy($"NPAHeader.code").agg(collect_list(struct($"NPAData",$"NPAHeader")).as("npa"))
我有以下模式的數據幀之後:
StructType(StructField(npaNumber,StringType,true), StructField(npa,ArrayType(StructType(StructField(NPAData...)))))
每一行的一個例子是類似的東西:
[1234,WrappedArray([npaNew,npaOlder,...npaOldest])]
現在,我要的是生成與他又拿起另一個數據框只是在WrappedArray的元素之一,所以我想類似的輸出:
[1234,npaNew]
注意:從WrappedArray中選擇的元素是遍歷整個WrappedArray後匹配完整的邏輯的元素。但爲了簡化這個問題,我會總是拾取WrappedArray的最後一個元素(,在遍歷它後重復)。
要做到這一點,我想定義一個recurside UDF
import org.apache.spark.sql.functions.udf
def returnRow(elementList : Row)(index:Int): Row = {
val dif = elementList.size - index
val row :Row = dif match{
case 0 => elementList.getAs[Row](index)
case _ => returnRow(elementList)(index + 1)
}
row
}
val returnRow_udf = udf(returnRow _)
groupedNpa.map{row => (row.getAs[String]("npaNumber"),returnRow_udf(groupedNpa("npa")(0)))}
但我收到以下錯誤的地圖:
異常線程 「main」 java.lang.UnsupportedOperationException : 類型Int的架構=>不支持單元
我在做什麼錯?
順便說一下,我不確定我是否正確傳遞npa
列groupedNpa("npa")
。我accesing的WrappedArray作爲行,因爲我不知道如何通過Array[Row]
迭代(在get(index)
方法不存在於數組[行])
我想改變我的代碼儘可能地按照您的指示,但我是被迫向上移動到我的一些原始的辦法,因爲在那裏我需要運行這個具有星火1.6,按我的理解服務器groupByKey,mapGroups和reduceGroups,這將很容易我的生活很多,不能在該版本中使用。 –
這是新的情況,如果你想要checl。 https://stackoverflow.com/q/46463931/1773841我做了幾個更新,這就是爲什麼我更願意問一個不同的問題,而不是一次又一次地更新。我在Window()中添加partitionBy和orderBy以避免您指出的問題。我使用了一個「普通」函數,希望能夠從地圖中調用,所以我不會在返回類型中有限制。我知道RDD可以完成這件事,但我對DF並不樂觀。 –