2017-06-18 26 views
0

後創建的文件有什麼辦法來配置textFileStream源,它將處理任何文件添加到源目錄,無論文件的創建時間呢?星火流僅流流初始化時間

爲了說明這個問題,我創建了一個使用textFileStream作爲源並打印流內容到控制檯基本星火流應用。當在運行應用程序之前創建的現有文件被複制到源目錄中時,沒有任何內容被打印到控制檯。當應用程序開始運行後創建的文件被複制到源目錄時,將打印文件內容。以下是我的代碼供參考。

val conf = new SparkConf().setAppName("Streaming Test") 
          .setMaster("local[*]") 

val spark = new SparkContext(conf) 
val ssc = new StreamingContext(spark, Seconds(5)) 

val fileStream = ssc.textFileStream("/stream-source") 

val streamContents = fileStream.flatMap(_.split(" ")) 

streamContents.print() 

回答

1

這是FileInputDStream的記錄行爲。

如果我們想消耗在該目錄中已有的文件,我們可以使用Spark API來加載這些文件,並應用我們所需的邏輯給他們。

val existingFiles = sparkContext.textFile(path) 

val existingFilesDS = sparkSession.read.text(path) 

然後後,設置和開始流的邏輯。 我們甚至可以使用已存在的文件的數據在換新的處理。

+0

在源代碼中的文檔是有些模棱兩可 - '在這種情況下,「新」是指當時period'中變得可見,以饗讀者文件。這使得看起來好像該文件不需要在時間段開始後創建,而是僅在該時間段開始後才被提供給源。 –