2017-06-02 40 views
0

我正在處理一個應用程序,其中每30秒(也可以是5秒),某些文件將被放入文件系統中。我必須讀它解析它並將一些記錄推送到REDIS。Spark本地文件流 - 容錯

在每個文件中,所有記錄都是獨立的,我沒有做任何計算,需要updateStateByKey

我的問題是,如果由於某些問題(例如:REDIS連接問題,文件中的數據問題等)某些文件沒有完全處理我想重新處理(說n次)的文件,並保持跟蹤已處理的文件。

出於測試目的,我正在從本地文件夾讀取數據。另外,我不知道如何得出結論,一個文件被完全處理並將其標記爲完成(也就是在這個文件處理文本文件或數據庫寫入)

val lines = ssc.textFileStream("E:\\SampleData\\GG") 
val words = lines.map(x=>x.split("_")) 
words.foreachRDD(
    x=> { 
    x.foreach(   
     x => { 
     var jedis = jPool.getResource(); 
     try{ 
      i=i+1 
      jedis.set("x"+i+"__"+x(0)+"__"+x(1), x(2)) 
     }finally{ 
      jedis.close() 
     } 
     } 
    ) 
    } 
) 

回答

相關問題