0
我試圖運行2個Dstreams,在第一個生成DataFrame中註冊df作爲tmp視圖,然後在另一個Dstream中使用它,如下所示:spark streaming - 在一個流中創建tmp視圖並在另一個流中使用
dstream1.foreachRDD { rdd =>
import org.apache.spark.sql._
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate
import spark.implicits._
import spark.sql
val records = rdd.toDF("record")
records.createOrReplaceTempView("records")
}
dstream2.foreachRDD { rdd =>
import org.apache.spark.sql._
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate
import spark.implicits._
import spark.sql
val records2 = rdd.toDF("record2")
val oldRecord = spark.table("records")
records2.join(oldRecod).write.json(...)
}
streamingContext.remember(Seconds(60))
streamingContext.start()
streamingContext.awaitTermination()
我一直在收到一個org.apache.spark.sql.catalyst.analysis.NoSuchTableException
所以顯然我沒有做正確的事情。
有沒有辦法做到這一點?
謝謝!