我手頭有org.apache.spark.sql.DataFrame = [id:bigint,name:string] 其中的示例數據看起來像像Apache Spark - 如何使用groupBy groupByKey組成一個(Key,List)對
(1, 「City1」) (2, 「請分享幫助」) (1, 「CityX」) (4 「CityZ」) (2, 「CityN」)
我試圖形成像
一個輸出1,( 「City1」, 「CityX」) 2,( 「請分享幫助」, 「CityN」) 4,( 「CityZ」)
我嘗試以下變種
df.groupByKey.mapValues(_.toList).show(20, false)
df.groupBy("id").show(20, false)
df.rdd.groupByKey.mapValues(_.toList).show(20, false)
df.rdd.groupBy("id").show(20, false)
他們都抱怨要麼GROUPBY或groupByKey是含糊不清或找不到方法的錯誤。任何幫助表示讚賞。
我試圖張貼在Spark Group By Key to (Key,List) Pair的解決方案,但是這並沒有爲我工作,它失敗,出現以下錯誤:
<console>:88: error: overloaded method value groupByKey with alternatives:
[K](func: org.apache.spark.api.java.function.MapFunction[org.apache.spark.sql.Row,K], encoder: org.apache.spark.sql.Encoder[K])org.apache.spark.sql.KeyValueGroupedDataset[K,org.apache.spark.sql.Row] <and>
[K](func: org.apache.spark.sql.Row => K)(implicit evidence$3: org.apache.spark.sql.Encoder[K])org.apache.spark.sql.KeyValueGroupedDataset[K,org.apache.spark.sql.Row]
cannot be applied to()
感謝。
編輯:
我曾嘗試以下操作:
val result = df.groupBy("id").agg(collect_list("name"))
這給
org.apache.spark.sql.DataFrame = [id: bigint, collect_list(node): array<string>]
我不知道如何使用此collect_list類型..我想這轉儲通過做文件
result.rdd.coalesce(1).saveAsTextFile("test")
,我看到以下內容
[1, WrappedArray(City1, CityX)]
[2, WrappedArray(City3, CityN)]
[4, WrappedArray(CityZ)]
如何將此內容轉儲爲以下內容?
[1, (City1, CityX)]
[2,(City3, CityN)]
[4,(CityZ)]
請注意,爲了在使用combineByKey時獲得最佳結果,它有助於數據是否被分區 – SiLaf
謝謝你會試用它。我也偶然發現了aggregateByKey()。從DF我可以做foreachRDD並在RDD上使用上述建議的方法 –
aggregateByKey()也應該工作。我總是使用combineByKey作爲習慣的問題,因爲它更一般,所以可以應用於更多的情況,但這只是我。 :) – SiLaf