2016-05-30 51 views
1

問題陳述火花 - 通過輸出從組中刪除CompactBuffer(RDD)

需要分組的RDD

輸入

Header1^Header2 
A^4B 
A^11A 
B^7A 
C^6DF 
C^7DS 

後火花輸出(刪除CompactBuffer)格式化所需輸出

(A,(4B,11A)) 
(B,(7A)) 
(C,(6DF,7DS)) 

我有什麼企圖

val records = sc.textFIle("/user/chronicles/test.txt").map(x => { 
    val y = x.split("\\^",-1) 
    (y(0).trim(), 
    y(1).trim()) 
    }).groupBy(x => x._1) 

records.foreach(println) 

輸出

(A,CompactBuffer((4B,11A))) 
(B,CompactBuffer((7A))) 
(C,CompactBuffer((6DF,7DS))) 

在我的解決方案,我可以刪除 「CompactBuffer」 使用的foreach讀取每個元素,然後替換單詞和額外使用替換命令的符號

是否有任何其他方式可用於形成數據。

注意: 我跟了: 「how to remove compactbuffer in spark output」 - mkString在這種情況下

回答

2

沒有工作,如果我正確理解你的問題,在這裏你去:

val data = sc.parallelize(Seq("Header1^Header2", "A^4B", "A^11A", "B^7A", "C^6DF", "C^7DS")) 
      .map(x => { 
       val y = x.split("\\^", -1) 
      (y(0).trim(), y(1).trim()) 
      }).groupBy(x => x._1).mapValues(_.map(_._2).mkString("(",",",")")) 

data.collect.foreach(println) 
// (A,(4B,11A)) 
// (B,(7A)) 
// (C,(6DF,7DS)) 
// (Header1,(Header2)) 

要刪除的頭,你可以使用過濾器。我不確定這是否是這裏的問題。如果是這樣,請評論,以便我可以更正它。

+1

謝謝@eliasah - 它的工作:) 增加 - mapValues(_。map(_._ 2).mkString(「(」,「,」,「)」))到我現有的代碼。 – Debaditya