2015-04-03 63 views
11

我設置一個簡單的測試,流從S3的文本文件,當我試圖像星火流textFileStream不支持通配符

val input = ssc.textFileStream("s3n://mybucket/2015/04/03/") 

得到它的工作,並在桶我本來的日誌文件去那裏一切都會正常工作。

但是,如果他們是一個子文件夾,它不會發現,得到了投入的子文件夾的任何文件(是的,我知道HDFS實際上並未使用的文件夾結構)

val input = ssc.textFileStream("s3n://mybucket/2015/04/") 

所以,我想簡單地做通配符像我曾與一個標準的火花應用程序之前完成

val input = ssc.textFileStream("s3n://mybucket/2015/04/*") 

但當我嘗試這個,它拋出一個錯誤

java.io.FileNotFoundException: File s3n://mybucket/2015/04/* does not exist. 
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:506) 
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1483) 
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1523) 
at org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:176) 
at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:134) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) 
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) 
at scala.Option.orElse(Option.scala:257) 
..... 

我知道一個事實,即在讀標準Spark應用程序的fileInput時可以使用通配符,但似乎在進行流輸入時,它不會這樣做,也不會自動處理子文件夾中的文件。有什麼我在這裏失蹤?

最終什麼,我需要的是全天候運行streaming作業將被監視具有按日期放在它的日誌的S3桶

因此,像

s3n://mybucket/<YEAR>/<MONTH>/<DAY>/<LogfileName> 

有什麼方式將其交給最頂層的文件夾,並自動讀取顯示在任何文件夾中的文件(因爲顯然日期會每天增加)?

編輯

所以在挖掘到的文檔在http://spark.apache.org/docs/latest/streaming-programming-guide.html#basic-sources它指出嵌套目錄,不支持。

誰能說出爲什麼會出現這種情況?

另外,由於我的文件將根據日期進行嵌套,因此在流式傳輸應用程序中解決此問題的好方法是什麼?這有點複雜,因爲日誌需要幾分鐘才能寫入S3,所以即使我們在新的一天進入了幾分鐘,也可以將前一天寫入的最後一個文件寫入前一天的文件夾。

+0

其實我不確定s3是否支持通配符...... – eliasah 2015-04-03 09:19:24

+1

它當然是。過去8個月,我的工作一直使用通配符。另外,僅僅爲了理智檢查,我只是用通配符輸入了一個工作,工作得很好。 我也注意到,這是一個有點挑剔要求,你不這樣做 S3N:// mybucket/2015/04 * 如說 異常線程「main」產生java.io.IOException :不是一個文件:S3N:// mybucket/2015/4月1日 這是有道理的,因爲它不是一個文件 但是,如果你 S3N:// mybucket/2015/04/* 它正確解析天子文件夾中的所有文件.... 這種感覺對我來說就像一個錯誤。 – 2015-04-03 17:00:07

+1

我要投票回答這個問題。我記得有類似的問題,但我不記得我是如何解決它的。 – eliasah 2015-04-03 17:03:09

回答

0

我們有同樣的問題。我們用逗號加入了子文件夾名稱。

List<String> paths = new ArrayList<>(); 
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd"); 

try {   
    Date start = sdf.parse("2015/02/01"); 
    Date end = sdf.parse("2015/04/01"); 

    Calendar calendar = Calendar.getInstance(); 
    calendar.setTime(start);   

    while (calendar.getTime().before(end)) { 
     paths.add("s3n://mybucket/" + sdf.format(calendar.getTime())); 
     calendar.add(Calendar.DATE, 1); 
    }     
} catch (ParseException e) { 
    e.printStackTrace(); 
} 

String joinedPaths = StringUtils.join(",", paths.toArray(new String[paths.size()])); 
val input = ssc.textFileStream(joinedPaths); 

我希望以這種方式解決您的問題。

+0

很酷。你如何處理更大的結束日期?通過編譯和重新啓動程序?或者我錯過了什麼? – 2016-02-24 22:34:46

6

通過擴展FileInputDStream可以創建一些「醜陋但工作的解決方案」。 寫作sc.textFileStream(d)相當於

new FileInputDStream[LongWritable, Text, TextInputFormat](streamingContext, d).map(_._2.toString) 

您可以創建CustomFileInputDStream,將延長FileInputDStream。自定義類將從FileInputDStream類複製計算方法,並根據需要調整findNewFiles方法。

改變findNewFiles方法來自:

private def findNewFiles(currentTime: Long): Array[String] = { 
    try { 
     lastNewFileFindingTime = clock.getTimeMillis() 

    // Calculate ignore threshold 
    val modTimeIgnoreThreshold = math.max(
    initialModTimeIgnoreThreshold, // initial threshold based on newFilesOnly setting 
    currentTime - durationToRemember.milliseconds // trailing end of the remember window 
) 
    logDebug(s"Getting new files for time $currentTime, " + 
    s"ignoring files older than $modTimeIgnoreThreshold") 
    val filter = new PathFilter { 
    def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold) 
    } 
    val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString) 
    val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime 
    logInfo("Finding new files took " + timeTaken + " ms") 
    logDebug("# cached file times = " + fileToModTime.size) 
    if (timeTaken > slideDuration.milliseconds) { 
    logWarning(
     "Time taken to find new files exceeds the batch size. " + 
     "Consider increasing the batch size or reducing the number of " + 
     "files in the monitored directory." 
    ) 
    } 
    newFiles 
} catch { 
    case e: Exception => 
    logWarning("Error finding new files", e) 
    reset() 
    Array.empty 
} 

}

到:

private def findNewFiles(currentTime: Long): Array[String] = { 
    try { 
     lastNewFileFindingTime = clock.getTimeMillis() 

     // Calculate ignore threshold 
     val modTimeIgnoreThreshold = math.max(
     initialModTimeIgnoreThreshold, // initial threshold based on newFilesOnly setting 
     currentTime - durationToRemember.milliseconds // trailing end of the remember window 
    ) 
     logDebug(s"Getting new files for time $currentTime, " + 
     s"ignoring files older than $modTimeIgnoreThreshold") 
     val filter = new PathFilter { 
     def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold) 
     } 
     val directories = fs.listStatus(directoryPath).filter(_.isDirectory) 
     val newFiles = ArrayBuffer[FileStatus]() 

     directories.foreach(directory => newFiles.append(fs.listStatus(directory.getPath, filter) : _*)) 

     val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime 
     logInfo("Finding new files took " + timeTaken + " ms") 
     logDebug("# cached file times = " + fileToModTime.size) 
     if (timeTaken > slideDuration.milliseconds) { 
     logWarning(
      "Time taken to find new files exceeds the batch size. " + 
      "Consider increasing the batch size or reducing the number of " + 
      "files in the monitored directory." 
     ) 
     } 
     newFiles.map(_.getPath.toString).toArray 
    } catch { 
     case e: Exception => 
     logWarning("Error finding new files", e) 
     reset() 
     Array.empty 
    } 
    } 

會檢查文件中的所有一級子文件夾,你可以調整它使用批處理時間戳以訪問相關的「子目錄」。

我創建的CustomFileInputDStream正如我所提到,並通過調用激活它:

new CustomFileInputDStream[LongWritable, Text, TextInputFormat](streamingContext, d).map(_._2.toString) 

這似乎表現我們的預期。

當我寫的解決方案這樣我必須補充幾點考慮:

  • 您正在打破星火封裝和創造,你將不得不完全支持隨着時間的循環中的自定義類。

  • 我相信像這樣的解決方案是最後的選擇。如果你的用例可以通過不同的方式實現,通常避免這樣的解決方案會更好。

  • 如果你在S3上會有很多「子目錄」,並且會檢查它們中的每一個,這會花費你。

  • 理解Databricks是否僅僅因爲可能的性能損失而不支持嵌套文件會非常有趣,也許還有更深層次的原因我沒有想過。

+0

我有一個類似的用例,如果我找不到替代方案,我正在考慮沿着這條路走下去。我使用格式爲YYYY-MM-DD-HH的日期分區子文件夾。每個小時都會創建一個新文件夾並將文件上傳到其中。所以我不一定要掃描所有的子文件夾(只有最後三個),並不會遇到性能問題。我更擔心這種代碼和狀態管理的重新啓動(哪個小時文件夾+文件最後一次掃描等)的可維護性。看看你是否可以分享你的想法,甚至可以爲你的自定義FileDstream工作的代碼。 – Cheeko 2016-03-01 21:11:52

+0

如果您在流中使用檢查點目錄,那麼當您重新啓動應用程序時,您將首先重新安排應用程序停機期間應執行的所有批次。例如,如果您的流式傳輸間隔爲1分鐘,並且您的應用程序在10:00下班並在10:30後備份,那麼當應用程序啓動時,該應用程序將嘗試執行批次爲10:01,10:02等。 現在,如果您執行findNewFiles(currentTime)的方式是您掃描的文件夾是從當前時間派生的,那麼您將能夠在重新啓動後掃描「正確」的文件。 – 2016-03-02 12:01:53

+0

請注意currentTime實際上不是CURRENT時間,而是批次的時間。 我能想到的唯一問題就是如果你的文件不是不可變的。例如,你在10:10將一些數據寫入文件A並在10:20覆蓋這些數據,那麼如果你的應用程序在10:10-10:20之間停止工作,那麼你將失去對A的第一次寫入。確實是一個問題,但我並不熟悉在這種情況下使用可變文件的許多組織。 – 2016-03-02 12:05:07