2017-09-13 49 views

回答

3

讀取新文件下面是該用例的完整的解決方案:

如果您在獨立模式下運行。你可以增加驅動器內存:

bin/spark-shell --driver-memory 4G 

無需設置執行內存在單機模式下執行驅動程序內運行。

由於完成的@ T.Gaweda的解決方案,找到下面的解決方案:

val userSchema = new StructType().add("name", "string").add("age", "integer") 
val csvDF = spark 
    .readStream 
    .option("sep", ";") 
    .schema(userSchema)  // Specify schema of the csv files 
    .csv("/path/to/directory") // Equivalent to format("csv").load("/path/to/directory") 

csvDf.writeStream.format("console").option("truncate","false").start() 

現在,你在目錄中的數據幀操作添加任何CSV文件中的火花會持續監控指定的目錄,並儘快「csvDF」將在該文件上執行。

注意:如果你想火花,則InferSchema你必須先設置以下配置:

spark.sqlContext.setConf("spark.sql.streaming.schemaInferenc‌​e","true") 

其中火花是你的火花會議。

1

寫在官方documentation你應該使用「文件」來源:

文件源 - 讀取寫入目錄中的數據流文件。支持的文件格式是文本,csv,json,parquet。查看DataStreamReader接口的文檔以獲取更新的列表,以及每種文件格式的支持選項。請注意,文件必須原子地放置在給定的目錄中,在大多數文件系統中,這可以通過文件移動操作來實現。從資料爲準

代碼示例:

// Read all the csv files written atomically in a directory 
val userSchema = new StructType().add("name", "string").add("age", "integer") 
val csvDF = spark 
    .readStream 
    .option("sep", ";") 
    .schema(userSchema)  // Specify schema of the csv files 
    .csv("/path/to/directory") // Equivalent to format("csv").load("/path/to/directory") 

如果不指定觸發,星火會盡快

+0

我有問題。這項工作是否也適用於閱讀avro文件?這是否支持谷歌雲存儲,也就是說,我想類似地處理我的gcs桶中出現的新文件?這種方法容錯,即如果管道失敗,如何恢復,如何知道哪些文件被處理,哪些是新的? – user179156

+0

在text/json的情況下,如果我的流式管道失敗,新的流式管道如何知道從哪裏開始使用文件? – user179156

相關問題