2017-03-06 58 views
0

我試圖運行2個Dstreams,在第一個生成DataFrame中註冊df作爲tmp視圖,然後在另一個Dstream中使用它,如下所示:spark streaming - 在一個流中創建tmp視圖並在另一個流中使用

dstream1.foreachRDD { rdd => 
    import org.apache.spark.sql._ 
    val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate 
    import spark.implicits._ 
    import spark.sql 

    val records = rdd.toDF("record") 
    records.createOrReplaceTempView("records") 
} 
dstream2.foreachRDD { rdd => 
    import org.apache.spark.sql._ 
    val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate 
    import spark.implicits._ 
    import spark.sql 

    val records2 = rdd.toDF("record2") 
    val oldRecord = spark.table("records") 
    records2.join(oldRecod).write.json(...) 
} 
streamingContext.remember(Seconds(60)) 
    streamingContext.start() 
    streamingContext.awaitTermination() 

我一直在收到一個org.apache.spark.sql.catalyst.analysis.NoSuchTableException 所以顯然我沒有做正確的事情。

有沒有辦法做到這一點?

謝謝!

回答

0

這實際上工作, 問題是,當在本地進行測試時,您需要爲計算留下額外的核心,然後從流中引入數據。

我使用了master = local [2],因此每個核心都用於處理每個流,而不用任何其他操作。 一旦我將其更改爲master = local [4],它可以正常工作

相關問題