2017-04-18 23 views
0

我有一個火花流作業是閱讀ELB日誌從S3目錄, s3://elb-data/2017-04-17/, 解析它們,並將它們轉換爲ORC,然後將它們存儲在一個新的目錄:s3://parsed-data/2017-04-17/。這裏是我的代碼來做到這一點:更新星火變流在執行期間

val streamContext = new StreamingContext(sc, Seconds(30)) 
val rawLogFormat = new SimpleDateFormat("yyyy/MM/dd/") 
val rawLogDate = rawLogFormat.format(new java.util.Date()) 
val filepath = args(0) + rawLogDate 
val parsedLog = streamContext.textFileStream(filepath) 

val jsonRows = parsedLog.mapPartitions(lines => { 
    val txfm = new LogLine2Json 
    lines.map(line => 
    try{ 
     txfm.parseLine(line) 
    } 
    catch { 
     case e: Exception => {println(line); "";} 
    } 
) 
}) 

在新的一天開始時,AWS會自動將登錄到一個新的目錄,我想我的工作流參考哪些。 (s3://elb-logs/2017-04-18/)但是,我注意到,一旦我的工作到了04/17數據的末尾,它就停止看到新的文件。有沒有辦法在代碼執行時更新這個變量?或者這是否要求我通過​​提交新工作?感謝您的幫助

回答

0

我認爲您的filepath對您的流式作業保持不變,它會一直解析這個相同的目錄s3://parsed-data/2017-04-17/。你需要使它指向你的根目錄,即s3://parsed-data/,那麼如果有新的日期s3://parsed-data/2017-04-18/它應該能夠拿起新的數據。也看到這個SO的帖子。

+0

我唯一的問題是我是否可以相應地更改輸出的位置?我需要's3:// elb-logs/2017-04-17 /'中的所有數據結束於's3:// parsed-logs/2017-04-17 /'和s3中的所有數據://elb-logs/2017-04-18 /'登陸's3:// parsed-logs/2017-04-18 /'等。這可能與通配符解決方案有關嗎? – jpavs