2017-02-24 96 views
0

我正在EMR上運行Spark工作,但需要創建檢查點。我嘗試使用S3,但得到這個錯誤消息Spark AWS emr檢查點位置

17/02/24 14:34:35 ERROR ApplicationMaster: User class threw exception: 
java.lang.IllegalArgumentException: Wrong FS: s3://spark- 
jobs/checkpoint/31d57e4f-dbd8-4a50-ba60-0ab1d5b7b14d/connected- 
components-e3210fd6/2, expected: hdfs://ip-172-18-13-18.ec2.internal:8020 
java.lang.IllegalArgumentException: Wrong FS: s3://spark- 
jobs/checkpoint/31d57e4f-dbd8-4a50-ba60-0ab1d5b7b14d/connected- 
components-e3210fd6/2, expected: hdfs://ip-172-18-13-18.ec2.internal:8020 

這裏是我的示例代碼

... 
val sparkConf = new SparkConf().setAppName("spark-job") 
    .set("spark.default.parallelism", (CPU * 3).toString) 
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
    .registerKryoClasses(Array(classOf[Member], classOf[GraphVertex], classOf[GraphEdge])) 
    .set("spark.dynamicAllocation.enabled", "true") 


implicit val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() 

sparkSession.sparkContext.setCheckpointDir("s3://spark-jobs/checkpoint") 
.... 

我怎樣才能檢查站AWS EMR?

+0

請提供配置文件以正確查看情況。 – FaigB

+0

@FaigB剛剛做了,謝謝 –

回答

1

在主分支中有一個修復方法可以讓檢查點也是s3。我能夠建立反對它,它的工作,所以這應該是下一個版本的一部分。

+1

中沒有'.getOrCreate(...)'很好聽。不要在檢查點之間添加相當長的時間間隔,因爲它需要很長時間才能保存......從不頻繁的檢查點重建它可能會更快,並且檢查點本身的perf命中會更少。因人而異 –

0

試試用AWS authenticaton像:

val hadoopConf: Configuration = new Configuration() 
    hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") 
    hadoopConf.set("fs.s3n.awsAccessKeyId", "id-1") 
    hadoopConf.set("fs.s3n.awsSecretAccessKey", "secret-key") 

    sparkSession.sparkContext.getOrCreate(checkPointDir,() => 
     { createStreamingContext(checkPointDir, config) }, hadoopConf) 
+0

在sparkContext –

1

有用於星火這意味着你只能檢查點到默認FS,沒有其他任何一個(像S3)一個已經得到解決的錯誤。這是固定在主,不知道backports。

如果它讓你感覺更好,檢查點的工作方式如下:write然後rename()對象存儲足夠慢,你可能發現自己的本地檢查點更好,然後自己上傳到s3。