2016-05-27 112 views
0

我在HDFS上有一個目錄,其中每個10分鐘複製一個文件(現有文件被覆蓋)。 我想用Spark流(1.6.0)讀取文件的內容,並將其用作參考數據將其加入其他流。使用Spark Streaming讀取fileStream

我設置了「記得窗口spark.streaming.fileStream.minRememberDuration爲「600S」,並設置newFilesOnly,因爲 當我啓動應用程序我wan't來從HDFS的初始數據是已經在那了。

val ssc = new StreamingContext(sparkConf, Seconds(2)) 
def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".") 
val lines: DStream[String] = 
    ssc.fileStream[LongWritable, Text, TextInputFormat](loc, defaultFilter(_), false).map(_._2.toString) 
lines.foreachRDD { x => x.foreach(println) } 

我的想法是要堅持這個DSTREAM的內容到內存中,並委託維護 這個「批量查找緩存」星火的任務。 我希望在HDFS目錄每次更改後自動獲取新數據,我可以將其加入到其他數據流中。

我不明白:

  • 當我開始加載數據,但這時如果 我在本地觸摸文件並覆蓋HDFS上的一個我不會看到 其內容打印應用出去了
  • 如何緩存和重新加載這些數據?
  • 當我緩存它會在工作節點或 這(隨着連接)將在驅動程序中發生?

我是否也應該將StreamingContext時間間隔設置爲10分鐘,因爲我只會每10分鐘更改一次?

回答

2

只是一些原始的想法。

當我開始加載數據,但這時如果我觸摸 該文件在本地和覆蓋一個HDFS上我不會看到它的內容 打印出來了

火花流媒體應用要處理數據,文件必須以原子方式創建,例如通過將文件移動到Spark正在監視的目錄中。文件重命名操作通常是原子的。你能測試這個來驗證它正在工作嗎?

如何緩存和重新加載此數據? 當我緩存它時,這將在工作節點上可用,或者這個 (連同連接)將在驅動程序中發生?

直接的解決方案可能是在foreachRDD()方法中註冊臨時表。在流式傳輸期間新數據將來臨時,可以重新創建相應的表格。請記住,foreachRDD()方法中的邏輯應該是冪等的。

瞭解表名,您可以輕鬆地創建一個單獨的管道進行查詢,該管道將連接來自此預先預存臨時表的數據。只要確保您將StreamingContext設置爲記住足夠數量的流數據,以便查詢可以運行。

我應該還的StreamingContext時間間隔設置爲10分鐘爲 我只會有每10分鐘變化?

在理想的情況下,節奏應該匹配。爲了安全起見,您也可以在foreachRDD()方法中收到新數據時檢查時間戳。

相關問題