2017-02-02 52 views
5

從火花結構流文件: 「此檢查點的位置必須是在HDFS兼容的文件系統的路徑,並且可以開始一個時被設置爲在DataStreamWriter一個選項查詢「。阿帕奇火花(結構化數據流):S3檢查點支持

果然,檢查點設置到S3路徑拋出:

17/01/31 21:23:56 ERROR ApplicationMaster: User class threw exception: java.lang.IllegalArgumentException: Wrong FS: s3://xxxx/fact_checkpoints/metadata, expected: hdfs://xxxx:8020 
java.lang.IllegalArgumentException: Wrong FS: s3://xxxx/fact_checkpoints/metadata, expected: hdfs://xxxx:8020 
     at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:652) 
     at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:194) 
     at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:106) 
     at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305) 
     at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301) 
     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) 
     at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1301) 
     at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1430) 
     at org.apache.spark.sql.execution.streaming.StreamMetadata$.read(StreamMetadata.scala:51) 
     at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:100) 
     at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232) 
     at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269) 
     at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262) 
     at com.roku.dea.spark.streaming.FactDeviceLogsProcessor$.main(FactDeviceLogsProcessor.scala:133) 
     at com.roku.dea.spark.streaming.FactDeviceLogsProcessor.main(FactDeviceLogsProcessor.scala) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:498) 
     at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637) 
17/01/31 21:23:56 INFO SparkContext: Invoking stop() from shutdown hook 

一對夫婦的問題在這裏:

  1. 爲什麼S3不支持作爲一個檢查點目錄(正常的火花流支持這個)?什麼使文件系統「符合HDFS」?
  2. 我強烈使用HDFS(因爲集羣可以一直上下),並使用s3作爲保存所有數據的地方 - 在這樣的設置中存儲用於結構化流數據的檢查點數據的建議是什麼?
+0

純粹的猜測在這裏,但你有沒有嘗試過s3n或s3a(最好是s3a)協議? – ImDarrenG

+0

絕對值得和嘗試,會試試看。 – Apurva

回答

2

什麼使FS HDFS「符合?」它是一個文件系統,具有在Hadoop FS specification中指定的行爲。對象存儲和FS之間的差被覆蓋在那裏,與關鍵點是「最終一致的對象存儲,而不追加或O(1)原子重命名不符合」

對於S3特別

  1. 這是不一致:在創建新的blob後,list命令通常不會顯示它。相同的刪除。
  2. 當斑被覆蓋或刪除,它可能需要一段時間才能消失
  3. 重命名()是通過複製來實現的,然後通過保存一切的位置,然後重命名刪除

星火流關卡它到檢查點目錄。這使得檢查點的時間與在S3中執行數據拷貝的時間成比例,該時間爲〜6-10MB/s。

當前的流媒體代碼不適合s3,雖然在某些時候我可能會解決這個問題,但是我沒有提交任何新的補丁,直到他們提交我的舊補丁。如果它只是被忽略,那麼我對這些東西的工作就毫無意義。

現在,做

  • 檢查點HDFS的一個,然後拷貝過來的結果
  • 檢查點位EBS的分配和連接到羣集
  • 檢查站S3,但有檢查點之間的長時間差距,以便檢查點的時間不會導致流式應用程序停機。

如果您正在使用EMR,您可以支付一個一致的,發電數據庫支持的S3的保費,這可以提供更好的一致性。但複製時間仍然相同,因此檢查點設置會一樣慢

+0

我們在檢查點到S3之間有40秒的時間間隔,而且我們偶爾還會檢查點問題,例如正在寫入臨時目錄,然後找不到。 –

+0

未找到檢查點可能是s3的一致性曲面:清單往往滯後於對象庫中的更改。通常你不會注意到,但有時會出現。爲元數據存儲使用發電機應該可行......至少如果不行的話,它一直在錯誤地實施 –

4

這是一個已知的問題:https://issues.apache.org/jira/browse/SPARK-19407

應固定在下一版本中。作爲解決方法,您可以使用--conf spark.hadoop.fs.defaultFS=s3將默認文件系統設置爲s3。

+0

不要以爲這已經解決了。仍然無法在S3上檢查點結構化流(spark 2.1.1)。 檢查點恢復失敗: 7/06/29 00:29:00信息StateStoreCoordinatorRef:已註冊StateStoreCoordinator端點 org.apache.spark.sql.AnalysisException:此查詢不支持從檢查點位置恢復。 – Apurva

+0

這是一個不同的問題。你使用不支持恢復的「內存」或「控制檯」嗎? – zsxwing