2017-09-11 43 views
1

我實現了spark應用程序。 我創建了火花背景:Spark不會從s3讀取/寫入信息(ResponseCode = 400,ResponseMessage = Bad Request)

private JavaSparkContext createJavaSparkContext() { 
      SparkConf conf = new SparkConf(); 
      conf.setAppName("test"); 
      if (conf.get("spark.master", null) == null) { 
       conf.setMaster("local[4]"); 
      } 
      conf.set("fs.s3a.awsAccessKeyId", getCredentialConfig().getS3Key()); 
      conf.set("fs.s3a.awsSecretAccessKey", getCredentialConfig().getS3Secret()); 
      conf.set("fs.s3a.endpoint", getCredentialConfig().getS3Endpoint()); 

      return new JavaSparkContext(conf); 
     } 

我嘗試通過火花集API(火花SQL)從S3獲取數據:

 String s = "s3a://" + getCredentialConfig().getS3Bucket(); 
    Dataset<Row> csv = getSparkSession() 
         .read() 
         .option("header", "true") 
         .csv(s + "/dataset.csv"); 

    System.out.println("Read size :" + csv.count()); 

有一個錯誤:

Exception in thread "main" com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: 1A3E8CBD4959289D, AWS Error Code: null, AWS Error Message: Bad Request, S3 Extended Request ID: Q1Fv8sNvcSOWGbhJSu2d3Nfgow00388IpXiiHNKHz8vI/zysC8V8/YyQ1ILVsM2gWQIyTy1miJc= 

Hadoop版本:2.7

AWS端點:s3.eu-central-1.amazonaws.com

(Hadoop的2.8 - 一切工作正常)

回答

0

的問題是:法蘭克福不支持S3N。需要使用s3a。 而這個地區有V4 auth版本。 http://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region

歐盟(法蘭克福)EU-中央1版4只

這意味着公司需要啓用它AWS客戶端上。 需要補充的系統屬性

com.amazonaws.services.s3.enableV4 - >真

conf.set("com.amazonaws.services.s3.enableV4", "true");//doesn't work for me 

在本地機器我用:

System.setProperty("com.amazonaws.services.s3.enableV4", "true"); 

有關AWS運行EMR需要添加PARAMS火花提交:

spark.executor.extraJavaOptions=-Dcom.amazonaws.services.s3.enableV4=true 
spark.driver.extraJavaOptions=-Dcom.amazonaws.services.s3.enableV4=true 

此外,您應該爲文件系統添加類實施:

conf.set("spark.hadoop.fs.s3a.impl", org.apache.hadoop.fs.s3a.S3AFileSystem.class.getName()); 
conf.set("spark.hadoop.fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); 
conf.set("spark.hadoop.fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());