我有一個小型測試項目來將數據推送到S3存儲桶。但是,它看起來像我沒有讀取core-site.xml文件,因爲我收到錯誤java.io.IOException: No file system found with scheme s3a
。我如何正確讀取core-site.xml文件並將數據推送到S3?用Apache Flink將數據推送到S3
這是代碼:
public class S3Sink {
public static void main(String[] args) throws Exception {
Map<String, String> configs = ConfigUtils.loadConfigs(「path/to/config.yaml");
final ParameterTool parameterTool = ParameterTool.fromMap(configs);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
env.getConfig().setGlobalJobParameters(parameterTool);
DataStream<String> messageStream = env
.addSource(new FlinkKafkaConsumer09<String>(
parameterTool.getRequired("kafka.topic"),
new SimpleStringSchema(),
parameterTool.getProperties()));
String id = UUID.randomUUID().toString();
messageStream.writeAsText("s3a://flink-test/" + id + ".txt").setParallelism(1);
env.execute();
}
這是弗林克-conf.yaml文件中的配置變化來引用核心site.xml文件:
fs.hdfs.hadoopconf: /path/to/core-site/etc/hadoop/
這是我的核心-site.xml:
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<!-- Comma separated list of local directories used to buffer
large results prior to transmitting them to S3. -->
<property>
<name>fs.s3a.buffer.dir</name>
<value>/tmp</value>
</property>
<!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants -->
<property>
<name>fs.s3a.awsAccessKeyId</name>
<value>*****</value>
</property>
<!-- set your AWS access key -->
<property>
<name>fs.s3a.awsSecretAccessKey</name>
<value>*****</value>
</property>
如果將'fs.hdfs.hadoopconf'設置爲連續'core-site.xml'文件夾,那麼它會起作用嗎?還要確保'$ HADOOP_HOME'環境變量設置正確。 –
我正在使用IntelliJ並將環境變量HADOOP_HOME設置爲core-site.xml路徑。我在本地運行程序,因此fs.hdfs.hadoopconf設置不起作用。 – Sam