1
我可以執行連接兩個星火DStreams像:星火:抗連接兩個DStreams
val joinStream = stream1.join(stream2)
現在,如果我需要過濾掉所有未加入的記錄。基本上,就像stream1.anti-join(stream2)
。這可能以某種方式嗎?
謝謝,感謝任何幫助!
我可以執行連接兩個星火DStreams像:星火:抗連接兩個DStreams
val joinStream = stream1.join(stream2)
現在,如果我需要過濾掉所有未加入的記錄。基本上,就像stream1.anti-join(stream2)
。這可能以某種方式嗎?
謝謝,感謝任何幫助!
假如你想知道這些:
val rdd1 = sc.parallelize(Array(
(1, "one"),
(2, "twow"),
(3, "three"),
(4, "four"),
(5, "five")
))
val rdd2 = sc.parallelize(Array(
(1, "otherOne"),
(4, "otherFour"),
(5,"otherFive"),
(6,"six"),
(7,"seven")
))
val antiJoined = rdd1.fullOuterJoin(rdd2).filter(r => r._2._1.isEmpty || r._2._2.isEmpty)
antiJoined.collect foreach println
(6,(None,Some(six)))
(2,(Some(twow),None))
(3,(Some(three),None))
(7,(None,Some(seven)))
我不知道我明白你什麼意思通過反連接 – eliasah
其中有一個共同的核心權利的記錄之間的連接發生的呢?我需要來自兩個流沒有共同的JOIN鍵的所有記錄。 – void
類似於http://2.bp.blogspot.com/-9xB6dMw3mcY/UIGn0glldYI/AAAAAAAAAEo/H8AkcRYvUHk/s1600/sql-left-outer-join-where-table-is-null-or-table-is-null。 png? – eliasah