我有這樣一段代碼:試圖瞭解火花流流
val lines: org.apache.spark.streaming.dstream.InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
lines.foreachRDD { rdd =>
val df = cassandraSQLContext.read.json(rdd.map(x => x._2))
sparkStreamingService.run(df)
}
ssc.start()
ssc.awaitTermination()
我的理解是,foreachRDD在驅動程序級別發生的呢?所以基本上所有的代碼塊:
lines.foreachRDD { rdd =>
val df = cassandraSQLContext.read.json(rdd.map(x => x._2))
sparkStreamingService.run(df)
}
發生在驅動程序級別?所述sparkStreamingService.run(DF)方法基本上沒有對電流數據幀一些轉換,以產生一個新的數據幀,然後調用存儲數據幀到另一個卡桑德拉方法(在另一個罐)。 因此,如果這種情況都發生在驅動程序級別,我們沒有利用火花執行程序,我怎樣才能做到這一點,以便並行使用執行程序來並行處理RDD的每個分區
我的火花流服務運行方法:
var metadataDataframe = df.select("customer", "tableName", "messageContent", "initialLoadRunning").collect()
metadataDataframe.foreach(rowD => {
metaData = populateMetaDataService.populateSiteMetaData(rowD)
val headers = (rowD.getString(2).split(recordDelimiter)(0))
val fields = headers.split("\u0001").map(
fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
val listOfRawData = rowD.getString(2).indexOf(recordDelimiter)
val dataWithoutHeaders = rowD.getString(2).substring(listOfRawData + 1)
val rawData = sparkContext.parallelize(dataWithoutHeaders.split(recordDelimiter))
// val rawData = dataWithoutHeaders.split(recordDelimiter)
val rowRDD = rawData
.map(_.split("\u0001"))
.map(attributes => Row(attributes: _*))
val newDF = cassandraSQLContext.createDataFrame(rowRDD, schema)
dataFrameFilterService.processBasedOnOpType(metaData, newDF)
})
但在這裏:http://spark.apache.org/docs/latest/streaming-programming-guide.html,如果你向下滾動到他們使用forEachRdd的地方,他們有一個評論說,一個特定的聲明正在以@Ahmed司機 – Ahmed
執行我編輯它直接與比文檔 –
是,這樣是我的問題增加了更多的明確性解決這一問題,就收集。到現在爲止,由於我正在收集,記錄是按順序處理的,並且這些記錄中的每一個都會分發給執行者?然後刪除收集,所有的記錄將被並行處理而不是順序處理,是嗎? – Ahmed