2015-04-15 77 views
3

將DStream RDD與批量RDD進行聯合/聯合/聯合使用的唯一方法是通過「transform」方法,該方法返回另一個DStream RDD,因此將在其末尾丟棄微批次。向DStream中的批量RDD添加新元素RDD

有沒有辦法將Dstream RDD與批RDD結合起來,從而生成包含DStream RDD和批RDD的元素的新批RDD。

並且一旦以上述方式創建了這樣的批RDD,它是否可以被其他DStream RDD用於例如因爲此時結果可能是另一個DStream RDD

有效地,上述功能將導致對批量RDD的元素進行週期性更新(附加) - 其他元素將繼續來自DStream RDD,這些元素將保持流式傳輸每個微批次。 還新到DSTREAM RDDS將能夠參加與這樣以前更新一批RDD和

東西幾乎產生結果DSTREAM RDD這樣可以updateStateByKey來實現,但有沒有辦法做到這一點這裏描述

回答

4

另一種方法是將批量輸入轉換爲DStream並將其與流式輸入結合。然後使用foreachRDD將其寫出來,這是您批量輸入其他作業的新增內容。

val batch = sc.textFile(...) 

val ssc = new StreamingContext(sc, Seconds(30)) 
val stream = ssc.textFileStream(...) 

import scala.collection.mutable 
val batchStream = ssc.queueStream(mutable.Queue.empty[RDD[String]], oneAtATime = false, defaultRDD = batch) 

val union = ssc.union(Seq(stream, batchStream)) 

union.print() 

union.foreachRDD { rdd => 
    // Delete previous, or use SchemaRDD with .insertInto(, overwrite = true) 
    rdd.saveTextFile(...) 
} 

ssc.start() 
ssc.awaitTermination() 
+0

快速的問題:我可以傳遞一個參照本發明的'mutable.Queue'到'queueStream()',然後任意地推RDDS到該隊列,並讓它們被合併到流? 如果不是那麼我想我可以通過調用'queueStream()'新的批處理RDD,然後'scc.union()'每次我想合併一個新的RDD來實現這個功能? –