0
我有一個火花流作業是閱讀ELB日誌從S3目錄, s3://elb-data/2017-04-17/
, 解析它們,並將它們轉換爲ORC,然後將它們存儲在一個新的目錄:s3://parsed-data/2017-04-17/
。這裏是我的代碼來做到這一點:更新星火變流在執行期間
val streamContext = new StreamingContext(sc, Seconds(30))
val rawLogFormat = new SimpleDateFormat("yyyy/MM/dd/")
val rawLogDate = rawLogFormat.format(new java.util.Date())
val filepath = args(0) + rawLogDate
val parsedLog = streamContext.textFileStream(filepath)
val jsonRows = parsedLog.mapPartitions(lines => {
val txfm = new LogLine2Json
lines.map(line =>
try{
txfm.parseLine(line)
}
catch {
case e: Exception => {println(line); "";}
}
)
})
在新的一天開始時,AWS會自動將登錄到一個新的目錄,我想我的工作流參考哪些。 (s3://elb-logs/2017-04-18/
)但是,我注意到,一旦我的工作到了04/17數據的末尾,它就停止看到新的文件。有沒有辦法在代碼執行時更新這個變量?或者這是否要求我通過提交新工作?感謝您的幫助
我唯一的問題是我是否可以相應地更改輸出的位置?我需要's3:// elb-logs/2017-04-17 /'中的所有數據結束於's3:// parsed-logs/2017-04-17 /'和s3中的所有數據://elb-logs/2017-04-18 /'登陸's3:// parsed-logs/2017-04-18 /'等。這可能與通配符解決方案有關嗎? – jpavs