2016-08-03 122 views
0

我目前正在探索Spark。我面臨以下任務 - 獲取RDD,根據特定條件對其進行分區,然後將多個文件寫入S3存儲桶中的不同文件夾中。Spark RDD foreach Partition to S3

一切都很好,直到我們來上傳到S3部分。我已閱讀所有關於這個問題的問題,並發現我可以使用AmazonS3ClientsaveToTextFile方法進行RDD。有兩個問題,我面對:

  1. 如果我與AmazonS3Client去我得到一個java.io.NotSerializableException因爲代碼是從星火驅動程序發送給它需要序列化和顯然的AmazonS3Client不支持工人那。

  2. 如果我去saveToTextFile我也遇到類似的問題。當我進入foreachPartition循環時,我需要獲得Iterable[T](在這種情況下爲p),所以如果我想使用saveToTextFile,則需要創建Iterable的RDD,因此需要創建parallelize。問題是SparkContext sc也(沒錯)不會序列化。

rdd.foreachPartition { p => sc.parallelize(p.toSeq).saveAsTextFile(s"s3n://") }

任何幫助將不勝感激。

回答

2

有沒有必要這樣做。你可以只用saveAsTextFile與RDD:

rdd.saveAsTextFile(s"s3n://dir/to/aux/file") 

saveAsTextFile將寫入S3與文件的許多部分的文件夾中(如許多地區的分區)。然後你可以合併成一個文件,如果你想:

def mergeToS3(srcPath: String, dstPath: String, sc: SparkContext): Unit = { 
    val hadoopConfig = sc.hadoopConfiguration 
    val fs = FileSystem.get(new URI(srcPath), hadoopConfig) 
    FileUtil.copyMerge(fs, new Path(srcPath), fs, new Path(dstPath), true, hadoopConfig, null) 
    } 

    mergeToS3("s3n://dir/to/aux/file", "s3n://dir/to/singleFile",sc)