SparkSession
.builder
.master("local[*]")
.config("spark.sql.warehouse.dir", "C:/tmp/spark")
.config("spark.sql.streaming.checkpointLocation", "C:/tmp/spark/spark-checkpoint")
.appName("my-test")
.getOrCreate
.readStream
.schema(schema)
.json("src/test/data")
.cache
.writeStream
.start
.awaitTermination
在spark 2.1.0中執行此示例時出現錯誤。 沒有.cache
選擇它的工作如預期,但與.cache
選項我得到:爲什麼在流式數據集上使用緩存會失敗,並顯示「AnalysisException:必須使用writeStream.start()執行帶有流式數據源的查詢」?
異常線程「main」 org.apache.spark.sql.AnalysisException:查詢與流媒體源必須與writeStream.start執行( );; FileSource [SRC /測試/數據] 在org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $ $ .ORG阿帕奇$ $火花SQL $ $催化劑分析$ $$ UnsupportedOperationChecker throwError(UnsupportedOperationChecker.scala:196) 在org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $$ anonfun $ $ checkForBatch 1.適用(UnsupportedOperationChecker.scala:35) 在org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $$ anonfun $ $ checkForBatch 1 。適用(UnsupportedOperationChecker.scala:33) 在org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128) 在org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $。 checkForBatch(UnsupportedOperationChecker.scala:33) at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecut ion.scala:58) 在org.apache.spark.sql.execution.QueryExecution.withCachedData $ lzycompute(QueryExecution.scala:69) 在org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala: (QueryExecution.scala:73) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan org.apache.spark.sql.execution.QueryExecution.sparkPlan $ lzycompute(QueryExecution.scala:79) 在org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75) 在org.apache。 spark.sql.execution.QueryExecution.executedPlan $ lzycompute(QueryExecution.scala:84) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryEx ecution.scala:84) at org.apache.spark.sql.execution.CacheManager $$ anonfun $ cacheQuery $ 1.apply(CacheManager.scala:102) at org.apache.spark.sql.execution.CacheManager.writeLock( CacheManager.scala:65) at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:89) at org.apache.spark.sql.Dataset.persist(Dataset.scala:2479) at org.apache.spark.sql.Dataset.cache(Dataset.scala:2489) 在org.me.App $。主要(App.scala:23) 在org.me.App.main(App.scala)
任何想法?
對不起,但我不認爲只是不使用緩存是解決方案。 –
Martin,請隨時參與[SPARK-20927]上的評論(https://issues.apache.org/jira/browse/SPARK-20927?focusedCommentId=16334363&page=com.atlassian.jira.plugin.system.issuetabpanels %3Acomment-tabpanel#comment-16334363)關於在流式計算上緩存的需求 – mathieu