2017-02-20 38 views
3

我正在使用RunJobFlow命令啓動Spark EMR集羣。該命令將JobFlowRole設置爲具有策略AmazonElasticMapReduceforEC2RoleAmazonRedshiftReadOnlyAccess的IAM角色。第一個策略包含允許所有s3權限的操作。EMR Spark無法將數據幀保存到S3

當EC2實例旋轉起來時,它們承擔此IAM角色,並通過STS生成臨時憑證。

我所做的第一件事是從我的Redshift羣集中讀取一張表格,使用com.databricks.spark.redshift格式並使用相同的IAM角色從Esh JobFlowRole中卸載紅移數據。

據我所知,這會在Redshift上運行一個UNLOAD命令來轉儲到我指定的S3存儲桶中。 Spark然後將新卸載的數據加載到Dataframe中。對於tempdir選項,我使用推薦的s3n://協議。

這個命令很好用,它總是成功地將數據加載到數據框中。

然後我運行一些轉換並嘗試將csv格式的數據幀保存到相同的S3存儲桶Redshift Unloaded中。

然而,當我嘗試這樣做,它會引發以下錯誤

java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively)

好。所以我不知道爲什麼發生這種情況,但我試圖通過設置推薦的hadoop配置參數來解決這個問題。然後我用DefaultAWSCredentialsProviderChain通過

spark.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", <CREDENTIALS_ACCESS_KEY>) spark.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", <CREDENTIALS_SECRET_ACCESS_KEY>)

加載AWSAccessKeyIDAWSSecretKey並設置當我再次運行它,它引發以下錯誤:

java.io.IOException: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: The AWS Access Key Id you provided does not exist in our records. (Service: Amazon S3; Status Code: 403; Error Code: InvalidAccessKeyId;

好。所以這沒有奏效。然後我刪除設置Hadoop的配置,並通過s3n://ACCESS_KEY:[email protected]/KEY

硬編碼的IAM用戶在S3網址憑據當我跑這它吐出以下錯誤:

java.lang.IllegalArgumentException: Bucket name should be between 3 and 63 characters long

所以它試圖創建一個水桶..這絕對不是我們想要它做的。

我真的被困在這一個,非常感謝這裏的任何幫助!當我在本地運行它時,它工作正常,但在EMR上完全失敗。

回答

0

在亞馬遜EMR上,嘗試使用前綴s3://來引用S3中的對象。

這是a long story

1

問題是以下幾點:

  • EC2實例生成的臨時憑證上EMR引導階段
  • 當我詢問紅移,我通過aws_iam_role到theDatabricks驅動程序。然後,驅動程序爲相同的IAM角色重新生成臨時憑證。這使EC2實例生成的憑證無效。
  • 我又試圖使用舊憑據(和被存儲在實例的元數據的憑據)

它失敗,因爲它試圖使用了過期的證書上傳到S3。

的解決方案是通過aws_iam_role以除去紅移授權,並與下列替換:

val credentials = EC2MetadataUtils.getIAMSecurityCredentials ... .option("temporary_aws_access_key_id", credentials.get(IAM_ROLE).accessKeyId) .option("temporary_aws_secret_access_key", credentials.get(IAM_ROLE).secretAccessKey) .option("temporary_aws_session_token", credentials.get(IAM_ROLE).token)