我們正在開發一個Spark流式ETL應用程序,它將從Kafka獲取數據,應用必要的轉換並將數據加載到MongoDB中。從Kafka收到的數據是JSON格式。根據從MongoDB獲取的查找數據,轉換應用於RDD的每個元素(JSON字符串)。由於查找數據發生變化,我需要爲每個批處理間隔提取它。查找數據是使用MongoDB中的SqlContext.read讀取的。由於SqlContext不可序列化,所以我無法在DStream.transform中使用SqlContext.read,因此我無法將其廣播到工作節點。現在我嘗試使用DStream.foreachRDD,其中我從MongoDB獲取數據並將查找數據廣播給工作人員。 RDD元素上的所有轉換均在rdd.map閉包內執行,該閉包利用廣播數據並執行轉換並返回RDD。然後將RDD轉換爲數據框並寫入MongoDB。目前,這個應用程序運行速度很慢。 PS:如果我移動從DStream.foreachRDD中提取查找數據的代碼部分,並添加DStream.transform來應用轉換並讓DStream.foreachRDD僅將數據插入到MongoDB中,則性能非常好。但是採用這種方法,查找數據不會爲每個批處理間隔更新。使用外部數據轉換DStream RDD
我在尋求幫助以瞭解這是否是一種好方法,我正在尋找一些提高性能的指導。
下面是一個僞代碼
package com.testing
object Pseudo_Code {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Pseudo_Code")
.setMaster("local[4]")
val sc = new SparkContext(sparkConf)
sc.setLogLevel("ERROR")
val sqlContext = new SQLContext(sc)
val ssc = new StreamingContext(sc, Seconds(1))
val mongoIP = "127.0.0.1:27017"
val DBConnectionURI = "mongodb://" + mongoIP + "/" + "DBName"
val bootstrap_server_config = "127.0.0.100:9092"
val zkQuorum = "127.0.0.101:2181"
val group = "streaming"
val TopicMap = Map("sampleTopic" -> 1)
val KafkaDStream = KafkaUtils.createStream(ssc, zkQuorum, group, TopicMap).map(_._2)
KafkaDStream.foreachRDD{rdd =>
if (rdd.count() > 0) {
//This lookup data has information required to perform transformation
//This information keeps changing, so the data should be fetched for every batch interval
val lookup1 = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", DBConnectionURI)
.option("spark.mongodb.input.collection", "lookupCollection1")
.load()
val broadcastLkp1 = sc.broadcast(lookup1)
val new_rdd = rdd.map{elem =>
val json: JValue = parse(elem)
//Foreach element in rdd, there are some values that should be looked up from the broadcasted lookup data
//"value" extracted from json
val param1 = broadcastLkp1.value.filter(broadcastLkp1.value("key")==="value").select("param1").distinct()
val param1ReplaceNull = if(param1.count() == 0){
"constant"
}
else{
param1.head().getString(0)
}
//create a new JSON with a different structure
val new_JSON = """"""
compact(render(new_JSON))
}
val targetSchema = new StructType(Array(StructField("key1",StringType,true)
,StructField("key2",TimestampType,true)))
val transformedDf = sqlContext.read.schema(targetSchema).json(new_rdd)
transformedDf.write
.option("spark.mongodb.output.uri",DBConnectionURI)
.option("collection", "tagetCollectionName")
.mode("append").format("com.mongodb.spark.sql").save()
}
}
// Run the streaming job
ssc.start()
ssc.awaitTermination()
}
}
這裏有一個有趣的問題。需要考慮的事情:你可以從你的kafka和你的mongoDB中流入嗎?如果是這樣的話,那麼你可以同時在兩個DStreams上工作。 –
@MichelLemay你有一個關於如何從mongoDB流式傳輸的例子。我可以試試看。現在,我可以按照https://stackoverflow.com/questions/37638519/spark-streaming-how-to-periodically-refresh-cached-rdd中提供的一些說明稍微向前邁進一步。我創建了一個DStream.foreachRDD,其中我重新加載了查找數據,然後創建了DStream.transform,其中使用了查找數據並返回一個新的RDD,然後創建了另一個將數據插入到mongoDB中的foreachRDD。這有效,但表現非常糟糕。 – Sid
您是否嘗試過在dataframe API中爲您的json轉換提供from_json函數?您可以嘗試結構化流式傳輸(如果您的驅動程序支持.writeStream)。 val msgSchema = Encoders.product [Message] .schema val ds = df .select(from_json($「value」.cast(「string」),msgSchema).as [Message]) – sgireddy