2017-02-11 118 views
1

基本上,我使用單個Spark Streaming消費者[直接方法]從多個kafka主題使用數據。如何將RDT數量的DStream轉換爲單個RDD

val dStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet).map(_._2) 

批處理間隔是30 Seconds

我在這裏得到了幾個問題。

  1. 當我在DStream上調用foreachRDD時,DStream是否包含多個RDD而不是單個RDD?每個主題都會創建單獨的RDD?
  2. 如果是,我想將所有的RDD聯合到單個RDD,然後處理數據。我怎麼做?
  3. 如果我的處理時間超過批處理間隔,DStream是否會包含多個RDD?

我試圖聯合DStream RDDs到單個RDD使用下面的方式。首先是我的理解正確嗎?如果DStream總是返回單個RDD,則下面的代碼不是必需的。

示例代碼:

var dStreamRDDList = new ListBuffer[RDD[String]] 
dStream.foreachRDD(rdd => 
     { 
      dStreamRDDList += rdd 
     }) 
val joinedRDD = ssc.sparkContext.union(dStreamRDDList).cache() 

//THEN PROCESS USING joinedRDD 
//Convert joinedRDD to DF, then apply aggregate operations using DF API. 

回答

1

請問DSTREAM包含多個RDD的,而不是單RDD當我打電話foreachRDD上DSTREAM?每個主題將創建單獨的RDD?

不會。即使您有多個主題,在任何給定的批處理間隔內都會有一個RDD。

如果我的處理時間超過批處理間隔,DStream是否會包含多個RDD?

不,如果您的處理時間比批處理間隔長,那麼所有將要完成的工作是讀取主題偏移量。下一個批次的處理只有在前一個作業完成後纔會開始。

作爲一個方面說明,確保你真正需要使用foreachRDD,或者如果可能you're misusing the DStream API(免責聲明:我是帖子的作者)

+0

謝謝,我會讀您的文章和回來...: ) – Shankar

相關問題