2016-07-29 35 views
0

給定RDD[(A, B)],其中AB之間存在多對多關係,如何將關係的交集分組?在Spark中合併相交多對多關係

即,如果可以通過一個或多個B s從一個A到另一個A繪製關係,則應該對它們進行分組。同樣,B s可以通過A s進行分組。

例如,集合:

(1, 'a') 
(2, 'a') 
(2, 'b') 
(1, 'c') 
(3, 'f') 
(4, 'f') 
(5, 'g') 

應組成

([1,2], ['a','b','c']) 
([3,4], ['f']) 
([5], ['g']) 

我可以使用groupByKey獲得

(1, ['a', 'c']) 
(2, ['a', 'b']) 
(3, ['f']) 
(4, ['f']) 
(5, ['g']) 

並且還

('a', [1, 2]) 
('b', [2]) 
('c', [1]) 
('f', [3,4]) 
('g', [5]) 

但我不知道在哪裏把它從這裏開始。

+0

RDD不支持這樣的行動在箱子外面!我認爲,第一步是正確的。在任何groupBy之後,您需要根據需要對列表進行摺疊。 – rakesh

回答

0
object ManyToMany extends App { 
    val m = List((1, 'a'), 
    (2, 'a'), 
    (2, 'b'), 
    (1, 'c'), 
    (3, 'f'), 
    (4, 'f'), 
    (5, 'g')) 

    val mInt: Map[Int, Set[Char]] = m.groupBy(_._1).map { case (a, b) => a -> b.map { case (c, d) => d }.toSet } 
    val mChar: Map[Char, Set[Int]] = m.groupBy(_._2).map { case (a, b) => a -> b.map { case (c, d) => c }.toSet } 
    def isIntersect[A](as: List[Set[A]], bs: Set[A]): List[Set[A]] = as.filter { x => x.exists { y => bs.contains(y) } } 
    val c = m.map { case (a, b) => mInt(a) }.foldLeft(List.empty[Set[Char]]) { 
    case (sum, item) => 
     isIntersect(sum, item) match { 
     case Nil => item :: sum 
     case List(x) => 
      sum.filterNot(_ == x) ++ List(x ++ item) 
     } 
    } 
    val d = c.map(x => (x, x.map(mChar(_)).foldLeft(Set.empty[Int]) {  case (sum, i) => sum ++ i })) 
    println(d) 
} 
result: 
List((Set(g),Set(5)), (Set(a, c, b),Set(1, 2)), (Set(f),Set(3, 4)))