2016-09-08 52 views
0

我想觀察火花流如何使用DStream內的RDD加入兩個DStreams,但看到奇怪的結果,這是令人困惑。火花流加入奇怪的結果

在我的代碼中,我從套接字流中收集數據,通過某種邏輯將它們分成2個PairedDStreams。爲了收集一些批次加入,我創建了一個窗口來收集最後三批。然而,加入的結果是無能爲力的。請幫助我理解。

object Join extends App { 

    val conf = new SparkConf().setMaster("local[4]").setAppName("KBN Streaming") 
    val sc = new SparkContext(conf) 
    sc.setLogLevel("ERROR") 

    val BATCH_INTERVAL_SEC = 10 

    val ssc = new StreamingContext(sc, Seconds(BATCH_INTERVAL_SEC)) 
    val lines = ssc.socketTextStream("localhost", 8091) 

    //println(s"lines.slideDuration : ${lines.slideDuration}") 

    //lines.print() 
    val ds = lines.map(x => x) 

    import scala.util.Random 
    val randNums = List(1, 2, 3, 4, 5, 6) 

    val less = ds.filter(x => x.length <= 2) 
    val lessPairs = less.map(x => (Random.nextInt(randNums.size), x)) 
    lessPairs.print 

    val greater = ds.filter(x => x.length > 2) 
    val greaterPairs = greater.map(x => (Random.nextInt(randNums.size), x)) 
    greaterPairs.print 

    val join = lessPairs.join(greaterPairs).window(Seconds(30), Seconds(30)) 
    join.print 

    ssc.start 
    ssc.awaitTermination 
} 

測試結果:

---------------------------------- ---------時間:1473344240000毫秒 ------------------------------------ -------(1,b)(4,s)

----------------------------- --------------時間:1473344240000毫秒 ------------------------------- ------------(5,333)

------------------------------- ------------ Ti me:1473344250000 ms -------------------------------------------(2 ,x)

-------------------------------------------時間:1473344250000毫秒 -------------------------------------------(4 ,)

-------------------------------------------時間:1473344260000毫秒 -------------------------------------------(2 ,a)(0,b)

-------------------------------------- -----時間:1473344260000 ms ---------------------------------------- ---(2,ten)(1,one)(3,2)

------------------------------------------- Time:1473344260000 ms -------------------------------------------(4,(b,兩個))

回答

0

當連接被調用時,兩個RDD再次被重新計算,因此它們將包含與打印時顯示的值不同的值。因此,我們需要在第一次計算兩個RDD時進行緩存,因此稍後調用聯接時將使用相同的值(而不是再次重新計算兩個RDD)。我試過這個在多個例子,它工作正常。我錯過了Spark的基本核心概念。從「學習型星火」書

0

摘錄:

持久性(高速緩存)
如前所述,星火RDDS懶洋洋地評估,有時我們可能希望使用相同的RDD多次。如果我們天真地做到這一點,每次我們在RDD上調用一個動作時,Spark都會重新計算RDD及其所有依賴項。