我們使用Flink 1.2.0和建議的S3AFileSystem配置。當源代碼是S3存儲桶中的單個文件夾時,一個簡單的流式作業可以按預期工作。使用S3AFileSystem的Flink不會讀取S3中的子文件夾
作業運行時沒有錯誤 - 但是不是產生輸出 - 當它的源文件夾本身包含子文件夾時。
爲了清楚起見,以下是S3存儲桶的一個模型。運行作業以指向s3a://bucket/folder/2017/04/25/01/
可以正確讀取存儲桶中出現的所有三個對象和任何後續對象。將作業指向s3a://bucket/folder/2017/
(或任何其他中間文件夾)會導致作業不會產生任何內容。
在絕望中,我們嘗試了[in | ex]包含尾隨/
的排列組合。
.
`-- folder
`-- 2017
`-- 04
|-- 25
| |-- 00
| | |-- a.txt
| | `-- b.txt
| `-- 01
| |-- c.txt
| |-- d.txt
| `-- e.txt
`-- 26
工作代碼:
def main(args: Array[String]) {
val parameters = ParameterTool.fromArgs(args)
val bucket = parameters.get("bucket")
val folder = parameters.get("folder")
val path = s"s3a://$bucket/$folder"
val env = StreamExecutionEnvironment.getExecutionEnvironment
val lines: DataStream[String] = env.readFile(
inputFormat = new TextInputFormat(new Path(path)),
filePath = path,
watchType = FileProcessingMode.PROCESS_CONTINUOUSLY,
interval = Time.seconds(10).toMilliseconds)
lines.print()
env.execute("Flink Streaming Scala API Skeleton")
}
核心的site.xml是按每文檔配置:
<configuration>
<property>
<name>fs.s3a.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<property>
<name>fs.s3a.buffer.dir</name>
<value>/tmp</value>
</property>
</configuration>
這裏已包括所有S3AFileSystem這裏列出的罐子:https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/aws.html#flink-for-hadoop-27
我們很難過。這似乎應該工作;互聯網上有很多面包屑,表明這個做了的工作。 [例如,http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reading-files-from-an-S3-folder-td10281.html]
幫助我,同伴松鼠...你是我唯一的希望!
,我忘_possibly_相關項目加... https://issues.apache.org/jira/browse/HADOOP-13208表示版本2.8.0中對S3A listFiles的更改。 Flink文檔建議使用'hadoop-aws-2.7.2.jar'。 – StephenWithPH