我試圖聯合到已經在我們的集羣上的鍵散列分區分佈RDDS。我不需要保留任何順序或甚至分區,我只是希望工會盡可能快。在這個例子中,我確實需要所有記錄,而不僅僅是不同的記錄,但保持多樣性。Apache Spark RDD的union和zipPartition有什麼區別?
這裏是什麼,我會簡單地使用:
val newRDD = tempRDD1.union(tempRDD2)
這裏是別人推薦給我的印象是速度更快,因爲它利用的RDDS如何已經劃分和分配:
val newRDD = tempRDD1.zipPartitions(tempRDD2, preservesPartitioning=true)((iter, iter2) => iter++iter2)
哪個更快?結果是否完全一致,會員方面?
我問這個,因爲直到現在我還以爲這些方法是等價的,但是當我提高了我的數據量和分區數量,執行程序,內存等等時,我得到了奇怪的zipPartitions方法結果,之後不能正確使用reduceByKey。或許我的差異是由於我的RDD本身,它們的形式是((String,String),(String,Long,Long,Long,Long)),所以也許iter ++ iter2除了工會之外還做了其他的事情那些價值?
是zipPartitions隱含做任何額外的東西,像一個比較排序,或重新散列東西,或一般不同實施合併比工會嗎?
如果RDD包含非不同行,或多個密鑰副本,或者存在空分區或密鑰的散列衝突或任何其他此類問題,union-vs-zipPartitions會返回不同的結果嗎?
是的,我可以運行測試自己(其實,我已經這樣做了近2天了!),所以請不要發佈任何愚蠢的事問我,如果我已經試過這樣的,和這樣的.. 。我正在問這個問題,以便更好地理解代碼級下的情況。是否將「union」書寫爲「zipPartitions」的子項?
後來編輯:加入與toDebugString結果一些例子,所推薦的@Holden
val tempIntermediateRDD6 = tempIntermediateRDD1.
zipPartitions(tempIntermediateRDD2, true)((iter, iter2) => iter++iter2).
zipPartitions(tempIntermediateRDD5, true)((iter, iter2) => iter++iter2).
partitionBy(partitioner).
setName("tempIntermediateRDD6").
persist(StorageLevel.MEMORY_AND_DISK_SER)
tempIntermediateRDD6.checkpoint
println(tempIntermediateRDD6.toDebugString)
// (2568) tempIntermediateRDD6 ZippedPartitionsRDD2[169] at zipPartitions at mycode.scala:3203 [Disk Memory Serialized 1x Replicated]
// | ZippedPartitionsRDD2[168] at zipPartitions at mycode.scala:3202 [Disk Memory Serialized 1x Replicated]
// | tempIntermediateRDD1 ShuffledRDD[104] at partitionBy at mycode.scala:2824 [Disk Memory Serialized 1x Replicated]
// | CachedPartitions: 2568; MemorySize: 200.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B
// | CheckpointRDD[105] at count at mycode.scala:2836 [Disk Memory Serialized 1x Replicated]
// | tempIntermediateRDD2 ShuffledRDD[116] at partitionBy at mycode.scala:2900 [Disk Memory Serialized 1x Replicated]
// | CheckpointRDD[117] at count at mycode.scala:2912 [Disk Memory Serialized 1x Replicated]
// | tempIntermediateRDD5 MapPartitionsRDD[163] at distinct at mycode.scala:3102 [Disk Memory Serialized 1x Replicated]
// | CachedPartitions: 2568; MemorySize: 550.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B
// | CheckpointRDD[164] at count at mycode.scala:3113 [Disk Memory Serialized 1x Replicated]
與:
val tempIntermediateRDD6 = tempIntermediateRDD1.
union(tempIntermediateRDD2).
union(tempIntermediateRDD5).
partitionBy(partitioner).
setName("tempIntermediateRDD6").
persist(StorageLevel.MEMORY_AND_DISK_SER)
tempIntermediateRDD6.checkpoint
println(tempIntermediateRDD6.toDebugString)
// (2568) tempIntermediateRDD6 ShuffledRDD[170] at partitionBy at mycode.scala:3208 [Disk Memory Serialized 1x Replicated]
// +-(5136) UnionRDD[169] at union at mycode.scala:3207 [Disk Memory Serialized 1x Replicated]
// | PartitionerAwareUnionRDD[168] at union at mycode.scala:3206 [Disk Memory Serialized 1x Replicated]
// | tempIntermediateRDD1 ShuffledRDD[104] at partitionBy at mycode.scala:2824 [Disk Memory Serialized 1x Replicated]
// | CachedPartitions: 2568; MemorySize: 200.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B
// | CheckpointRDD[105] at count at mycode.scala:2836 [Disk Memory Serialized 1x Replicated]
// | tempIntermediateRDD2 ShuffledRDD[116] at partitionBy at mycode.scala:2900 [Disk Memory Serialized 1x Replicated]
// | CheckpointRDD[117] at count at mycode.scala:2912 [Disk Memory Serialized 1x Replicated]
// | tempIntermediateRDD5 MapPartitionsRDD[163] at distinct at mycode.scala:3102 [Disk Memory Serialized 1x Replicated]
// | CachedPartitions: 2568; MemorySize: 550.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B
// | CheckpointRDD[164] at count at mycode.scala:3113 [Disk Memory Serialized 1x Replicated]
一個區別我已經看到的是,這些命令返回的RDD類型是不同的:工會()返回一個ShuffledRDD,而zipPartitions()返回一個ZippedPartitionsRDD2。我稍後在程序中注意到這些RDD類型的某些操作(例如reduceByKey)的差異,所以我想知道這些RDD類型有什麼區別。 –
另請參見https://issues.apache。org/jira/browse/SPARK-10493的一些額外的背景和討論,爲什麼我問這個問題 –