2017-10-09 131 views
-1

我手頭有org.apache.spark.sql.DataFrame = [id:bigint,name:string] 其中的示例數據看起來像像Apache Spark - 如何使用groupBy groupByKey組成一個(Key,List)對

(1, 「City1」) (2, 「請分享幫助」) (1, 「CityX」) (4 「CityZ」) (2, 「CityN」)

我試圖形成像

一個輸出

1,( 「City1」, 「CityX」) 2,( 「請分享幫助」, 「CityN」) 4,( 「CityZ」)

我嘗試以下變種

df.groupByKey.mapValues(_.toList).show(20, false) 
df.groupBy("id").show(20, false) 
df.rdd.groupByKey.mapValues(_.toList).show(20, false) 
df.rdd.groupBy("id").show(20, false) 

他們都抱怨要麼GROUPBY或groupByKey是含糊不清或找不到方法的錯誤。任何幫助表示讚賞。

我試圖張貼在Spark Group By Key to (Key,List) Pair的解決方案,但是這並沒有爲我工作,它失敗,出現以下錯誤:

<console>:88: error: overloaded method value groupByKey with alternatives: 
    [K](func: org.apache.spark.api.java.function.MapFunction[org.apache.spark.sql.Row,K], encoder: org.apache.spark.sql.Encoder[K])org.apache.spark.sql.KeyValueGroupedDataset[K,org.apache.spark.sql.Row] <and> 
    [K](func: org.apache.spark.sql.Row => K)(implicit evidence$3: org.apache.spark.sql.Encoder[K])org.apache.spark.sql.KeyValueGroupedDataset[K,org.apache.spark.sql.Row] 
cannot be applied to() 

感謝。

編輯:

我曾嘗試以下操作:

val result = df.groupBy("id").agg(collect_list("name")) 

這給

org.apache.spark.sql.DataFrame = [id: bigint, collect_list(node): array<string>] 

我不知道如何使用此collect_list類型..我想這轉儲通過做文件

result.rdd.coalesce(1).saveAsTextFile("test") 

,我看到以下內容

[1, WrappedArray(City1, CityX)] 
[2, WrappedArray(City3, CityN)] 
[4, WrappedArray(CityZ)] 

如何將此內容轉儲爲以下內容?

[1, (City1, CityX)] 
[2,(City3, CityN)] 
[4,(CityZ)] 

回答

0

如果你有一對RDD,那麼你可以使用combineByKey()。要做到這一點,你必須傳遞3個方法作爲參數。

方法1採用String,例如「City1」作爲輸入,將這個字符串添加到空列表,並返回該列表 方法2需要一個字符串,例如「CityX」和由所創建的列表中的一先前的方法。將字符串添加到列表中並返回列表。方法3將2個列表作爲輸入。它返回一個新列表,其中包含來自2個參數列表的所有值

combineByKey將返回一個RDD>。

然而,在你的情況下,你開始與一個DataFrame,我沒有太多的經驗。我想你需要將它轉換爲RDD才能使用combineByKey()

+0

請注意,爲了在使用combineByKey時獲得最佳結果,它有助於數據是否被分區 – SiLaf

+0

謝謝你會試用它。我也偶然發現了aggregateByKey()。從DF我可以做foreachRDD並在RDD上使用上述建議的方法 –

+0

aggregateByKey()也應該工作。我總是使用combineByKey作爲習慣的問題,因爲它更一般,所以可以應用於更多的情況,但這只是我。 :) – SiLaf

相關問題