我不知道你的嘗試,但檢查下面的解決方案,這將爲你工作。
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType
from pyspark.sql.functions import udf
dfSchema = StructType([StructField('WindowID', IntegerType(), True),
StructField('State', StringType(), True),
StructField('Details', ArrayType(ArrayType(IntegerType())), True)])
#["WindowID", "State", "Details"]
mydf = sqlContext.createDataFrame([[6, 'SD', [[29916,3], [156570,4], [245934,1], [329748,8]]],
[3, 'CO', [[524586,2], [1548,3], [527220,1]]]], dfSchema)
mydf.show(truncate = False)
+--------+-----+---------------------------------------------------------------------------------------------------+
|WindowID|State|Details |
+--------+-----+---------------------------------------------------------------------------------------------------+
|6 |SD |[WrappedArray(29916, 3), WrappedArray(156570, 4), WrappedArray(245934, 1), WrappedArray(329748, 8)]|
|3 |CO |[WrappedArray(524586, 2), WrappedArray(1548, 3), WrappedArray(527220, 1)] |
+--------+-----+---------------------------------------------------------------------------------------------------+
def def_sort(x):
return sorted(x, key=lambda x:x[1], reverse=True)
udf_sort = udf(def_sort, ArrayType(ArrayType(IntegerType())))
mydf.select("windowID", "State", udf_sort("Details")).show(truncate = False)
+--------+-----+---------------------------------------------------------------------------------------------------+
|windowID|State|PythonUDF#def_sort(Details) |
+--------+-----+---------------------------------------------------------------------------------------------------+
|6 |SD |[WrappedArray(329748, 8), WrappedArray(156570, 4), WrappedArray(29916, 3), WrappedArray(245934, 1)]|
|3 |CO |[WrappedArray(1548, 3), WrappedArray(524586, 2), WrappedArray(527220, 1)] |
+--------+-----+---------------------------------------------------------------------------------------------------+