3
我有一個RDD
這是大到collect
。我已對RDD
應用了一系列轉換,並希望將其轉換後的數據直接從我的從站上的分區發送到S3。我目前的操作如下:如何將分區轉換後的數據發送到S3?
val rdd:RDD = initializeRDD
val rdd2 = rdd.transform
rdd2.first // in order to force calculation of RDD
rdd2.foreachPartition sendDataToS3
不幸的是,發送到S3的數據是未經轉換的。該RDD
看起來就像它在階段initializeRDD
。
這裏是sendDataToS3的身體:
implicit class WriteableRDD[T](rdd:RDD[T]){
def transform:RDD[String] = rdd map {_.toString}
....
def sendPartitionsToS3(prefix:String) = {
rdd.foreachPartition { p =>
val filename = prefix+new scala.util.Random().nextInt(1000000)
val pw = new PrintWriter(new File(filename))
p foreach pw.println
pw.close
s3.putObject(S3_BUCKET, filename, new File(filename))
}
this
}
}
這就是所謂的與rdd.transform.sendPartitionsToS3(prefix)
。
如何確保在sendDataToS3
中發送的數據是轉換後的數據?
我無法用您提供的代碼重現問題。 – zero323
是否可以保存來自'foreachPartition'閉包的轉換數據,可能將它傳遞給S3? –
我沒有嘗試過S3,但轉換後的數據正確傳遞給函數。 – zero323