2015-11-14 25 views
3

我有一個RDD這是大到collect。我已對RDD應用了一系列轉換,並希望將其轉換後的數據直接從我的從站上的分區發送到S3。我目前的操作如下:如何將分區轉換後的數據發送到S3?

val rdd:RDD = initializeRDD 
val rdd2 = rdd.transform 
rdd2.first // in order to force calculation of RDD 
rdd2.foreachPartition sendDataToS3 

不幸的是,發送到S3的數據是未經轉換的。該RDD看起來就像它在階段initializeRDD

這裏是sendDataToS3的身體:

implicit class WriteableRDD[T](rdd:RDD[T]){ 

def transform:RDD[String] = rdd map {_.toString} 

.... 
def sendPartitionsToS3(prefix:String) = { 
    rdd.foreachPartition { p => 
    val filename = prefix+new scala.util.Random().nextInt(1000000) 
    val pw = new PrintWriter(new File(filename)) 
    p foreach pw.println 
    pw.close 
    s3.putObject(S3_BUCKET, filename, new File(filename)) 
    } 
    this 
} 

} 

這就是所謂的與rdd.transform.sendPartitionsToS3(prefix)

如何確保在sendDataToS3中發送的數據是轉換後的數據?

+0

我無法用您提供的代碼重現問題。 – zero323

+0

是否可以保存來自'foreachPartition'閉包的轉換數據,可能將它傳遞給S3? –

+0

我沒有嘗試過S3,但轉換後的數據正確傳遞給函數。 – zero323

回答

3

我的猜測是你的代碼中沒有包含在問題中的錯誤。

無論如何我只是回答,以確保您知道RDD.saveAsTextFile。你可以在S3上給它一個路徑(s3n://bucket/directory),它會直接從執行者將每個分區寫入該路徑。

我幾乎無法想象當你需要實現自己的sendPartitionsToS3而不是使用saveAsTextFile

相關問題