10
SparkSession 
    .builder 
    .master("local[*]") 
    .config("spark.sql.warehouse.dir", "C:/tmp/spark") 
    .config("spark.sql.streaming.checkpointLocation", "C:/tmp/spark/spark-checkpoint") 
    .appName("my-test") 
    .getOrCreate 
    .readStream 
    .schema(schema) 
    .json("src/test/data") 
    .cache 
    .writeStream 
    .start 
    .awaitTermination 

在spark 2.1.0中執行此示例時出現錯誤。 沒有.cache選擇它的工作如預期,但與.cache選項我得到:爲什麼在流式數據集上使用緩存會失敗,並顯示「AnalysisException:必須使用writeStream.start()執行帶有流式數據源的查詢」?

異常線程「main」 org.apache.spark.sql.AnalysisException:查詢與流媒體源必須與writeStream.start執行( );; FileSource [SRC /測試/數據] 在org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $ $ .ORG阿帕奇$ $火花SQL $ $催化劑分析$ $$ UnsupportedOperationChecker throwError(UnsupportedOperationChecker.scala:196) 在org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $$ anonfun $ $ checkForBatch 1.適用(UnsupportedOperationChecker.scala:35) 在org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $$ anonfun $ $ checkForBatch 1 。適用(UnsupportedOperationChecker.scala:33) 在org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128) 在org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker $。 checkForBatch(UnsupportedOperationChecker.scala:33) at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecut ion.scala:58) 在org.apache.spark.sql.execution.QueryExecution.withCachedData $ lzycompute(QueryExecution.scala:69) 在org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala: (QueryExecution.scala:73) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan org.apache.spark.sql.execution.QueryExecution.sparkPlan $ lzycompute(QueryExecution.scala:79) 在org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75) 在org.apache。 spark.sql.execution.QueryExecution.executedPlan $ lzycompute(QueryExecution.scala:84) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryEx ecution.scala:84) at org.apache.spark.sql.execution.CacheManager $$ anonfun $ cacheQuery $ 1.apply(CacheManager.scala:102) at org.apache.spark.sql.execution.CacheManager.writeLock( CacheManager.scala:65) at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:89) at org.apache.spark.sql.Dataset.persist(Dataset.scala:2479) at org.apache.spark.sql.Dataset.cache(Dataset.scala:2489) 在org.me.App $。主要(App.scala:23) 在org.me.App.main(App.scala)

任何想法?

+1

對不起,但我不認爲只是不使用緩存是解決方案。 –

+1

Martin,請隨時參與[SPARK-20927]上的評論(https://issues.apache.org/jira/browse/SPARK-20927?focusedCommentId=16334363&page=com.atlassian.jira.plugin.system.issuetabpanels %3Acomment-tabpanel#comment-16334363)關於在流式計算上緩存的需求 – mathieu

回答

10

你的(非常有趣)的情況下可以歸結爲以下行(你可以在spark-shell執行):

scala> :type spark 
org.apache.spark.sql.SparkSession 

scala> spark.readStream.text("files").cache 
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();; 
FileSource[files] 
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297) 
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36) 
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) 
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:34) 
    at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:63) 
    at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:74) 
    at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72) 
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78) 
    at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78) 
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84) 
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80) 
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89) 
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89) 
    at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:104) 
    at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:68) 
    at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:92) 
    at org.apache.spark.sql.Dataset.persist(Dataset.scala:2603) 
    at org.apache.spark.sql.Dataset.cache(Dataset.scala:2613) 
    ... 48 elided 

這樣做的原因竟然很簡單的解釋(沒有雙關語星火SQL的explain意)。

spark.readStream.text("files")創建所謂的流式數據集

scala> val files = spark.readStream.text("files") 
files: org.apache.spark.sql.DataFrame = [value: string] 

scala> files.isStreaming 
res2: Boolean = true 

流數據集是星火SQL的Structured Streaming的基礎。

正如你可以在結構化數據流的Quick Example已經閱讀:

,然後開始使用start()流計算。

報價DataStreamWriter的start的scaladoc:

start()方法:StreamingQuery啓動流媒體查詢的執行,這將持續輸出結果,以給定的路徑作爲新的數據到達。

所以,你必須使用start(或foreach)開始流查詢的執行。你已經知道了。

但是......有結構化數據流是Unsupported Operations

此外,還有一些數據集的方法,不會在流媒體數據集工作。它們是會立即運行查詢並返回結果的操作,這對流式數據集沒有意義。

如果您嘗試這些操作中的任何一個,將會看到類似「操作XYZ不支持流式DataFrames/Datasets」的AnalysisException。

這看起來很熟悉,不是嗎?

cache在不支持的操作的列表,但是這是因爲它已經簡單地被忽視了(我報SPARK-20927修復它)。

cache應該已經在列表中,因爲它的查詢獲取星火SQL的CacheManager的註冊前不執行查詢。

讓我們去深入星火SQL深處... 屏住呼吸 ...

cacheispersistpersistrequests the current CacheManager to cache the query

sparkSession.sharedState.cacheManager.cacheQuery(this) 

雖然緩存查詢CacheManager確實execute it

sparkSession.sessionState.executePlan(planToCache).executedPlan 

我們知道是不允許的,因爲它是start(或foreach)這樣做。

問題解決!

+1

我認爲這是一個錯誤,所以我甚至更早地報告它https://issues.apache.org/jira/browse/SPARK-20865,我只需要確認我的難題。謝謝。 –

+0

由於目標代碼可能會改變,因此鏈接到主鏈並不是真正相關的。 我認爲這就是你的鏈接追加的內容 – crak

+0

@crak正確。我不應該用這個鏈接的主人。你認爲什麼會更好?查看過去特定版本的鏈接,但無法弄清楚如何在github上進行操作。介意提供一些幫助?我會很感激。 –

相關問題