2015-07-18 66 views
7

我有一個公共可用的Amazon s3資源(文本文件)並希望從spark中訪問它。這意味着 - 我沒有任何憑據亞馬遜 - 它工作得很好,如果我想只要下載​​:從Apache Spark訪問公共可用的Amazon S3文件

val bucket = "<my-bucket>" 
val key = "<my-key>" 

val client = new AmazonS3Client 
val o = client.getObject(bucket, key) 
val content = o.getObjectContent // <= can be read and used as input stream 

然而,當我嘗試從火花背景

val conf = new SparkConf().setAppName("app").setMaster("local") 
val sc = new SparkContext(conf) 
val f = sc.textFile(s"s3a://$bucket/$key") 
println(f.count()) 

訪問同一資源我收到堆棧跟蹤以下錯誤:

Exception in thread "main" com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain 
    at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117) 
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521) 
    at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031) 
    at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994) 
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297) 
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2653) 
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:92) 
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2687) 
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2669) 
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:371) 
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295) 
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:221) 
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270) 
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) 
    at scala.Option.getOrElse(Option.scala:121) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) 
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) 
    at scala.Option.getOrElse(Option.scala:121) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781) 
    at org.apache.spark.rdd.RDD.count(RDD.scala:1099) 
    at com.example.Main$.main(Main.scala:14) 
    at com.example.Main.main(Main.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) 

我不想提供任何AWS憑據 - 我只是想訪問資源匿名(現在) - 如何實現這一目標?我可能需要使它像AnonymousAWSCredentialsProvider - 但如何把它放在火花或hadoop?

P.S.我的build.sbt以防萬一

scalaVersion := "2.11.7" 

libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % "1.4.1", 
    "org.apache.hadoop" % "hadoop-aws" % "2.7.1" 
) 

更新:我做了一些調查後 - 我看到了它爲什麼沒有工作的原因。

首先,S3AFileSystem創建AWS客戶端憑證的順序如下:

AWSCredentialsProviderChain credentials = new AWSCredentialsProviderChain(
    new BasicAWSCredentialsProvider(accessKey, secretKey), 
    new InstanceProfileCredentialsProvider(), 
    new AnonymousAWSCredentialsProvider() 
); 

「ACCESSKEY」和「祕密密鑰」值從火花的conf例如採取(密鑰必須「fs.s3a。 access.key「和」fs.s3a.secret.key「org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY和org.apache.hadoop.fs.s3a.Constants.SECRET_KEY常量,這樣更方便)。

其次 - 您可能會看到AnonymousAWSCredentialsProvider是第三個選項(最後優先級) - 可能會出現什麼錯誤?請參見AnonymousAWSCredentials的實現:

public class AnonymousAWSCredentials implements AWSCredentials { 

    public String getAWSAccessKeyId() { 
     return null; 
    } 

    public String getAWSSecretKey() { 
     return null; 
    } 
} 

它對訪問密鑰和密鑰都只是返回null。聽起來很合理。但是,看看裏面AWSCredentialsProviderChain:

AWSCredentials credentials = provider.getCredentials(); 

if (credentials.getAWSAccessKeyId() != null && 
    credentials.getAWSSecretKey() != null) { 
    log.debug("Loading credentials from " + provider.toString()); 

    lastUsedProvider = provider; 
    return credentials; 
} 

它不選擇供應商的情況下,兩個鍵是空 - 這意味着匿名憑據不能工作。看起來像aws-java-sdk-1.7.4中的一個bug。我試圖使用最新版本 - 但它與hadoop-aws-2.7.1不兼容。

還有其他想法嗎?

+0

你有過任何成功,可能與最近的版本提供共享? –

+0

不,我沒有嘗試這一段時間 - 我甚至忘了它,不要使用亞馬遜s3的任何東西 – pkozlov

回答

3

我個人從未訪問過Spark的公共數據。您可以嘗試使用虛擬憑證,或僅爲此用途創建虛擬憑證。直接在SparkConf對象上設置它們。

val sparkConf: SparkConf = ??? 
val accessKeyId: String = ??? 
val secretAccessKey: String = ??? 
sparkConf.set("spark.hadoop.fs.s3.awsAccessKeyId", accessKeyId) 
sparkConf.set("spark.hadoop.fs.s3n.awsAccessKeyId", accessKeyId) 
sparkConf.set("spark.hadoop.fs.s3.awsSecretAccessKey", secretAccessKey) 
sparkConf.set("spark.hadoop.fs.s3n.awsSecretAccessKey", secretAccessKey) 

作爲替代,讀取DefaultAWSCredentialsProviderChain文件,看看那裏的憑據尋找。該列表(順序很重要)是:

  • 環境變量 - AWS_ACCESS_KEY_ID和AWS_SECRET_KEY
  • Java系統屬性 - aws.accessKeyId和aws.secretKey
  • 憑據配置文件的文件在默認位置(〜/ 。AWS /憑證)的所有AWS的SDK和AWS CLI
  • 實例檔憑證,通過亞馬遜EC2元數據服務
+0

某些東西仍然是錯誤的。我將以下值添加到您給我的密鑰(確切字符串「aaa」作爲虛擬憑據)。我希望在最壞的情況下看到auth錯誤,但我看到了同樣的例外:「無法從鏈中的任何提供程序加載AWS憑證」 – pkozlov

+1

正確的密鑰必須是「spark.hadoop.fs.s3a.access.key」和「 spark.hadoop.fs.s3a.secret.key'順便說一句,提供虛擬值並沒有幫助 - 現在我看到了403錯誤。看起來像AWS S3的火花使用匿名憑證是不可能的。 根據源代碼 - 憑證的順序是不同 AWSCredentialsProviderChain憑證=新AWSCredentialsProviderChain( 新BasicAWSCredentialsProvider(ACCESSKEY,祕密密鑰), 新InstanceProfileCredentialsProvider(), 新AnonymousAWSCredentialsProvider() ); 和。匿名根本不起作用。 – pkozlov

+0

好的,對不起,我沒有看到你使用's3a'協議。你用s3n嗎? –