我在HDFS上有一個目錄,其中每個10分鐘複製一個文件(現有文件被覆蓋)。 我想用Spark流(1.6.0)讀取文件的內容,並將其用作參考數據將其加入其他流。使用Spark Streaming讀取fileStream
我設置了「記得窗口」 spark.streaming.fileStream.minRememberDuration
爲「600S」,並設置newFilesOnly到假,因爲 當我啓動應用程序我wan't來從HDFS的初始數據是已經在那了。
val ssc = new StreamingContext(sparkConf, Seconds(2))
def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
val lines: DStream[String] =
ssc.fileStream[LongWritable, Text, TextInputFormat](loc, defaultFilter(_), false).map(_._2.toString)
lines.foreachRDD { x => x.foreach(println) }
我的想法是要堅持這個DSTREAM的內容到內存中,並委託維護 這個「批量查找緩存」星火的任務。 我希望在HDFS目錄每次更改後自動獲取新數據,我可以將其加入到其他數據流中。
我不明白:
- 當我開始加載數據,但這時如果 我在本地觸摸文件並覆蓋HDFS上的一個我不會看到 其內容打印應用出去了
- 如何緩存和重新加載這些數據?
- 當我緩存它會在工作節點或 這(隨着連接)將在驅動程序中發生?
我是否也應該將StreamingContext
時間間隔設置爲10分鐘,因爲我只會每10分鐘更改一次?