1
基本上,我使用單個Spark Streaming消費者[直接方法]從多個kafka主題使用數據。如何將RDT數量的DStream轉換爲單個RDD
val dStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet).map(_._2)
批處理間隔是30 Seconds
。
我在這裏得到了幾個問題。
- 當我在DStream上調用foreachRDD時,DStream是否包含多個RDD而不是單個RDD?每個主題都會創建單獨的RDD?
- 如果是,我想將所有的RDD聯合到單個RDD,然後處理數據。我怎麼做?
- 如果我的處理時間超過批處理間隔,DStream是否會包含多個RDD?
我試圖聯合DStream RDDs到單個RDD使用下面的方式。首先是我的理解正確嗎?如果DStream總是返回單個RDD,則下面的代碼不是必需的。
示例代碼:
var dStreamRDDList = new ListBuffer[RDD[String]]
dStream.foreachRDD(rdd =>
{
dStreamRDDList += rdd
})
val joinedRDD = ssc.sparkContext.union(dStreamRDDList).cache()
//THEN PROCESS USING joinedRDD
//Convert joinedRDD to DF, then apply aggregate operations using DF API.
謝謝,我會讀您的文章和回來...: ) – Shankar