2017-04-27 61 views
2

我們使用Flink 1.2.0和建議的S3AFileSystem配置。當源代碼是S3存儲桶中的單個文件夾時,一個簡單的流式作業可以按預期工作。使用S3AFileSystem的Flink不會讀取S3中的子文件夾

作業運行時沒有錯誤 - 但是不是產生輸出 - 當它的源文件夾本身包含子文件夾時。

爲了清楚起見,以下是S3存儲桶的一個模型。運行作業以指向s3a://bucket/folder/2017/04/25/01/可以正確讀取存儲桶中出現的所有三個對象和任何後續對象。將作業指向s3a://bucket/folder/2017/(或任何其他中間文件夾)會導致作業不會產生任何內容。

在絕望中,我們嘗試了[in | ex]包含尾隨/的排列組合。

. 
`-- folder 
    `-- 2017 
     `-- 04 
      |-- 25 
      | |-- 00 
      | | |-- a.txt 
      | | `-- b.txt 
      | `-- 01 
      |  |-- c.txt 
      |  |-- d.txt 
      |  `-- e.txt 
      `-- 26 

工作代碼:

def main(args: Array[String]) { 

    val parameters = ParameterTool.fromArgs(args) 
    val bucket = parameters.get("bucket") 
    val folder = parameters.get("folder") 

    val path = s"s3a://$bucket/$folder" 

    val env = StreamExecutionEnvironment.getExecutionEnvironment 

    val lines: DataStream[String] = env.readFile(
    inputFormat = new TextInputFormat(new Path(path)), 
    filePath = path, 
    watchType = FileProcessingMode.PROCESS_CONTINUOUSLY, 
    interval = Time.seconds(10).toMilliseconds) 

    lines.print() 
    env.execute("Flink Streaming Scala API Skeleton") 
} 

核心的site.xml是按每文檔配置:

<configuration> 
    <property> 
    <name>fs.s3a.impl</name> 
    <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> 
    </property> 
    <property> 
    <name>fs.s3a.buffer.dir</name> 
    <value>/tmp</value> 
    </property> 
</configuration> 

這裏已包括所有S3AFileSystem這裏列出的罐子:https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/aws.html#flink-for-hadoop-27

我們很難過。這似乎應該工作;互聯網上有很多面包屑,表明這個做了的工作。 [例如,http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reading-files-from-an-S3-folder-td10281.html]

幫助我,同伴松鼠...你是我唯一的希望!

+0

,我忘_possibly_相關項目加... https://issues.apache.org/jira/browse/HADOOP-13208表示版本2.8.0中對S3A listFiles的更改。 Flink文檔建議使用'hadoop-aws-2.7.2.jar'。 – StephenWithPH

回答

4

回答我自己的問題......在上面Steve Loughran的幫助下。

在弗林克,具有file-based data source to process continuously工作時,FileInputFormatenumerate nested files默認。

無論源是S3還是其他任何東西,情況都是如此。

您必須設置它像這樣:

def main(args: Array[String]) { 

    val parameters = ParameterTool.fromArgs(args) 
    val bucket = parameters.get("bucket") 
    val folder = parameters.get("folder") 

    val path = s"s3a://$bucket/$folder" 

    val env = StreamExecutionEnvironment.getExecutionEnvironment 

    val textInputFormat = new TextInputFormat(new Path(path)) 

    //this is important! 
    textInputFormat.setNestedFileEnumeration(true) 

    val lines: DataStream[String] = env.readFile(
    inputFormat = textInputFormat, 
    filePath = path, 
    watchType = FileProcessingMode.PROCESS_CONTINUOUSLY, 
    interval = Time.seconds(10).toMilliseconds) 

    lines.print() 
    env.execute("Flink Streaming Scala API Skeleton") 

}

0

什麼版本的Hadoop在這個下面?

如果在Hadoop 2.8中停止了,可能是迴歸,也許是我的錯。首先在FLINK下提供一個JIRA @issues.apache.org,然後,如果它在2.8.0中的新功能將它鏈接爲HADOOP-13208

這裏的代碼片段是一個很好的例子,可以用於迴歸測試,它是我曾爲Flink做過一些事情。

這個大listFiles()更改將文件的枚舉從遞歸樹形路徑移動到路徑下的所有子條目的一系列扁平列表:它非常適合其他任何事情(distcp,測試,配置單元,火花)和自16年12月以來一直在產品中出貨;如果是原因,我會有些驚訝,但不能否認責備。對不起

+0

最初,這是Flink 1.2.0/Scala 2.11/Hadoop 2.7.0,增加了hadoop-aws-2.7.2.jar(每個Flink文檔)。這是Flink網站的「工廠」下載選項之一。 我已經嘗試了各種Hadoop版本和/或hadoop-aws.jar版本並在本地構建。我已經升高到2.8.0並且降低到2.6.0,但沒有成功。 我本來希望** Hadoop 2.8.0可以解決它,但似乎Flink 1.2.0無法與該版本的Hadoop兼容。 Flink大師目前還沒有建設......我明天會再投入更多的東西。 – StephenWithPH

+0

那麼,如果它在Hadoop 2.7中,那麼它不是新的列表。爲什麼不看它是否在本地工作(file://)))或hdfs ...如果它在那裏工作而不在s3a上,那麼s3a是(不)這是可能的原因 –

+0

當然!首先檢查顯而易見的事物。我會在下面回答我自己的問題。這與s3a沒有任何關係,一切都與微妙的Flink選項有關。感謝您的幫助和耐心。 – StephenWithPH

相關問題