2017-07-04 87 views
1

我很新,既Spark和Scale,可能真的需要一個提示來解決我的問題。所以,我有兩個DataFrames A(列編號和名稱)和B(列ID和文本)想加入他們的行列,組由ID和文本的所有行合併成一個字符串:將Spark DataFrame的行彙總爲字符串後的字符串

一個

+--------+--------+ 
|  id| name| 
+--------+--------+ 
|  0|  A| 
|  1|  B| 
+--------+--------+ 

+--------+ -------+ 
|  id| text| 
+--------+--------+ 
|  0|  one| 
|  0|  two| 
|  1| three| 
|  1| four| 
+--------+--------+ 

期望的結果:

+--------+--------+----------+ 
|  id| name|  texts| 
+--------+--------+----------+ 
|  0|  A| one two| 
|  1|  B|three four| 
+--------+--------+----------+ 

到目前爲止,我想以下幾點:

var C = A.join(B, "id") 
var D = C.groupBy("id", "name").agg(collect_list("text") as "texts") 

這工作得很好,除此之外,我的文本列是一個字符串數組而不是字符串。非常感謝您的幫助。

回答

3

我只是添加在你的一些小的功能給正確的解決方案,這是

A.join(B, Seq("id"), "left").orderBy("id").groupBy("id", "name").agg(concat_ws(" ", collect_list("text")) as "texts") 
0

這是很簡單的:

val bCollected = b.groupBy('id).agg(collect_list('text).as("texts") 
val ab = a.join(bCollected, a("id") == bCollected("id"), "left") 

第一數據幀是具有文本直接結果,B數據幀收集每個ID。然後你加入一個。 bCollected應該小於b本身,所以它會可能得到更好的洗牌時間

+0

這個問題有一個愚蠢的地方。 – eliasah