2
我有一個火花流式傳輸用例,我計劃在每個執行器上廣播和緩存數據集。流中的每個微批將從RDD創建一個數據框並加入批處理。我下面給出的測試代碼將執行每批的廣播操作。有沒有辦法只播一次?與廣播聯接的火花流式傳輸
val testDF = sqlContext.read.format("com.databricks.spark.csv")
.schema(schema).load("file:///shared/data/test-data.txt")
val lines = ssc.socketTextStream("DevNode", 9999)
lines.foreachRDD((rdd, timestamp) => {
val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt, l(1))).toDF()
val resultDF = recordDF.join(broadcast(testDF), "Age")
resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
}
對於每個批次都讀取該文件並進行廣播。
16/02/18 12:24:02 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:27+28
16/02/18 12:24:02 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:0+27
16/02/18 12:25:00 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:27+28
16/02/18 12:25:00 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:0+27
對廣播數據集的任何建議只有一次?
我試過這種方法也和預期不會播出。可能是因爲foreachRDD是在司機的情況下執行的。順便說一句,我們必須在join語句中使用testDF.value。我認爲這是一個錯字。 – Cheeko
謝謝! 請注意,sc.broadcast(someDataFrame)會在播放之前將數據傳送給驅動程序,還是會執行每個執行程序的bittorrent風格廣播?我總是使用SQL的廣播提示。不知道有什麼區別。 – Cheeko
這不起作用。我的意思是你可以播放一個'DataFrame'(畢竟它是'Serializable'),但是你不能在DDS上嵌套操作。你可以簡單地收集,轉換成比'Array [Row]'更有用的東西,並廣播本地數據結構。然後只需使用UDF。 – zero323