2015-09-01 41 views
2

我有以下代碼,試圖輸出RDD到1000個文件與等文件大小。但是,我仍然只有70個輸出文件,文件大小非常不同(範圍從50M到2G)。爲了使輸出文件具有相同的大小,是否需要執行額外的步驟?謝謝!Spark:如何將輸出(saveAsTextFile)保存爲大小相同的文件?

val myRDD = input.flatMap { t => ??? } 
       .reduceByKey { (t1, t2) => ??? ; t3 } 
       .sortBy(-_._2.size) 
       .repartition(1000) 
       .map(t => (t._1 + "_" + t._2.size, t._2.toString)) 

myRDD.saveAsTextFile("myOutput", classOf[GzipCodec]) 

回答

0

您可以使用RangePartitioner創建相同大小的分區,然後保存艾德里安爲文本文件。

實施例從there採取:

import org.apache.spark.RangePartitioner; 
var file=sc.textFile("<my local path>")  
var partitionedFile=file.map(x=>(x,1)) 
var data= partitionedFile.partitionBy(new RangePartitioner(3, partitionedFile)) 
data.glom().collect()(0).length 
data.glom().collect()(1).length 
data.glom().collect()(2).length 

在你的情況下,代替收集並檢查長度,運行saveAsTextFile()應該足夠了。

+0

會ü介紹如何使用它的例子?謝謝! – Edamame

+0

現在我很少有時間,所以我找到了一個現成的解決方案,請驗證它是否可以。 – Niemand

0

這是相當簡單明瞭,所有你需要做的是利用再分配(1000) 和你的文件大小將是平等的修改正是1000

您的代碼:

val myRDD = input.flatMap { t => ??? } 
       .reduceByKey { (t1, t2) => ??? ; t3 } 
       .sortBy(-_._2.size) 
       .repartition(1000) 
       .map(t => (t._1 + "_" + t._2.size, t._2.toString)).repartition(1000) 

myRDD.saveAsTextFile("myOutput", classOf[GzipCodec]) 
0
the following answer will solve your purpose 

val myRDD = input.flatMap { t => ??? } 
       .reduceByKey { (t1, t2) => ??? ; t3 } 
       .sortBy(-_._2.size) 
       .repartition(1000) 
       .map(t => (t._1 + "_" + t._2.size, t._2.toString)) 

myRDD.repartition(1000).saveAsTextFile("myOutput", classOf[GzipCodec]) 

One thing to note that original rdd will have it's existance even after this because it is immutable 

or even you can use coalesce(1000) if you want to set max partitions to set 
相關問題