2

我有一個火花流式傳輸用例,我計劃在每個執行器上廣播和緩存數據集。流中的每個微批將從RDD創建一個數據框並加入批處理。我下面給出的測試代碼將執行每批的廣播操作。有沒有辦法只播一次?與廣播聯接的火花流式傳輸

val testDF = sqlContext.read.format("com.databricks.spark.csv") 
       .schema(schema).load("file:///shared/data/test-data.txt") 

val lines = ssc.socketTextStream("DevNode", 9999) 

lines.foreachRDD((rdd, timestamp) => { 
    val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt, l(1))).toDF() 
    val resultDF = recordDF.join(broadcast(testDF), "Age") 
    resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp) 
    } 

對於每個批次都讀取該文件並進行廣播。

16/02/18 12:24:02 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:27+28 
16/02/18 12:24:02 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:0+27 

16/02/18 12:25:00 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:27+28 
16/02/18 12:25:00 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:0+27 

對廣播數據集的任何建議只有一次?

回答

0

它看起來像現在廣播的表不被重複使用。請參閱:SPARK-3863

對廣播外foreachRDD環:

val testDF = broadcast(sqlContext.read.format("com.databricks.spark.csv") 
.schema(schema).load(...)) 

lines.foreachRDD((rdd, timestamp) => { 
    val recordDF = ??? 
    val resultDF = recordDF.join(testDF, "Age") 
    resultDF.write.format("com.databricks.spark.csv").save(...) 
} 

+0

我試過這種方法也和預期不會播出。可能是因爲foreachRDD是在司機的情況下執行的。順便說一句,我們必須在join語句中使用testDF.value。我認爲這是一個錯字。 – Cheeko

+0

謝謝! 請注意,sc.broadcast(someDataFrame)會在播放之前將數據傳送給驅動程序,還是會執行每個執行程序的bittorrent風格廣播?我總是使用SQL的廣播提示。不知道有什麼區別。 – Cheeko

+0

這不起作用。我的意思是你可以播放一個'DataFrame'(畢竟它是'Serializable'),但是你不能在DDS上嵌套操作。你可以簡單地收集,轉換成比'Array [Row]'更有用的東西,並廣播本地數據結構。然後只需使用UDF。 – zero323