2016-04-15 62 views
1

我可以執行連接兩個星火DStreams像:星火:抗連接兩個DStreams

val joinStream = stream1.join(stream2) 

現在,如果我需要過濾掉所有未加入的記錄。基本上,就像stream1.anti-join(stream2)。這可能以某種方式嗎?

謝謝,感謝任何幫助!

+0

我不知道我明白你什麼意思通過反連接 – eliasah

+0

其中有一個共同的核心權利的記錄之間的連接發生的呢?我需要來自兩個流沒有共同的JOIN鍵的所有記錄。 – void

+1

類似於http://2.bp.blogspot.com/-9xB6dMw3mcY/UIGn0glldYI/AAAAAAAAAEo/H8AkcRYvUHk/s1600/sql-left-outer-join-where-table-is-null-or-table-is-null。 png? – eliasah

回答

2

假如你想知道這些:

val rdd1 = sc.parallelize(Array(
    (1, "one"), 
    (2, "twow"), 
    (3, "three"), 
    (4, "four"), 
    (5, "five") 
)) 
val rdd2 = sc.parallelize(Array(
    (1, "otherOne"), 
    (4, "otherFour"), 
    (5,"otherFive"), 
    (6,"six"), 
    (7,"seven") 
)) 

val antiJoined = rdd1.fullOuterJoin(rdd2).filter(r => r._2._1.isEmpty || r._2._2.isEmpty) 

antiJoined.collect foreach println 
(6,(None,Some(six))) 
(2,(Some(twow),None)) 
(3,(Some(three),None)) 
(7,(None,Some(seven)))