我希望Spark能夠持續監控目錄並在文件出現在該目錄中時使用spark.readStream讀取CSV文件。如何使用Spark結構化數據流連續監控目錄
請不要包含Spark Streaming的解決方案。我正在尋找一種通過使用spark結構化流式傳輸的方法。
我希望Spark能夠持續監控目錄並在文件出現在該目錄中時使用spark.readStream讀取CSV文件。如何使用Spark結構化數據流連續監控目錄
請不要包含Spark Streaming的解決方案。我正在尋找一種通過使用spark結構化流式傳輸的方法。
讀取新文件下面是該用例的完整的解決方案:
如果您在獨立模式下運行。你可以增加驅動器內存:
bin/spark-shell --driver-memory 4G
無需設置執行內存在單機模式下執行驅動程序內運行。
由於完成的@ T.Gaweda的解決方案,找到下面的解決方案:
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
.readStream
.option("sep", ";")
.schema(userSchema) // Specify schema of the csv files
.csv("/path/to/directory") // Equivalent to format("csv").load("/path/to/directory")
csvDf.writeStream.format("console").option("truncate","false").start()
現在,你在目錄中的數據幀操作添加任何CSV文件中的火花會持續監控指定的目錄,並儘快「csvDF」將在該文件上執行。
注意:如果你想火花,則InferSchema你必須先設置以下配置:
spark.sqlContext.setConf("spark.sql.streaming.schemaInference","true")
其中火花是你的火花會議。
寫在官方documentation你應該使用「文件」來源:
文件源 - 讀取寫入目錄中的數據流文件。支持的文件格式是文本,csv,json,parquet。查看DataStreamReader接口的文檔以獲取更新的列表,以及每種文件格式的支持選項。請注意,文件必須原子地放置在給定的目錄中,在大多數文件系統中,這可以通過文件移動操作來實現。從資料爲準
代碼示例:
// Read all the csv files written atomically in a directory
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
.readStream
.option("sep", ";")
.schema(userSchema) // Specify schema of the csv files
.csv("/path/to/directory") // Equivalent to format("csv").load("/path/to/directory")
如果不指定觸發,星火會盡快
我有問題。這項工作是否也適用於閱讀avro文件?這是否支持谷歌雲存儲,也就是說,我想類似地處理我的gcs桶中出現的新文件?這種方法容錯,即如果管道失敗,如何恢復,如何知道哪些文件被處理,哪些是新的? – user179156
在text/json的情況下,如果我的流式管道失敗,新的流式管道如何知道從哪裏開始使用文件? – user179156