2015-09-09 82 views
3

我試圖聯合到已經在我們的集羣上的鍵散列分區分佈RDDS。我不需要保留任何順序或甚至分區,我只是希望工會盡可能快。在這個例子中,我確實需要所有記錄,而不僅僅是不同的記錄,但保持多樣性。Apache Spark RDD的union和zipPartition有什麼區別?

這裏是什麼,我會簡單地使用:

val newRDD = tempRDD1.union(tempRDD2) 

這裏是別人推薦給我的印象是速度更快,因爲它利用的RDDS如何已經劃分和分配:

val newRDD = tempRDD1.zipPartitions(tempRDD2, preservesPartitioning=true)((iter, iter2) => iter++iter2) 

哪個更快?結果是否完全一致,會員方面?

我問這個,因爲直到現在我還以爲這些方法是等價的,但是當我提高了我的數據量和分區數量,執行程序,內存等等時,我得到了奇怪的zipPartitions方法結果,之後不能正確使用reduceByKey。或許我的差異是由於我的RDD本身,它們的形式是((String,String),(String,Long,Long,Long,Long)),所以也許iter ++ iter2除了工會之外還做了其他的事情那些價值?

是zipPartitions隱含做任何額外的東西,像一個比較排序,或重新散列東西,或一般不同實施合併比工會嗎?

如果RDD包含非不同行,或多個密鑰副本,或者存在空分區或密鑰的散列衝突或任何其他此類問題,union-vs-zipPartitions會返回不同的結果嗎?

是的,我可以運行測試自己(其實,我已經這樣做了近2天了!),所以請不要發佈任何愚蠢的事問我,如果我已經試過這樣的,和這樣的.. 。我正在問這個問題,以便更好地理解代碼級下的情況。是否將「union」書寫爲「zipPartitions」的子項?

後來編輯:加入與toDebugString結果一些例子,所推薦的@Holden

val tempIntermediateRDD6 = tempIntermediateRDD1. 
    zipPartitions(tempIntermediateRDD2, true)((iter, iter2) => iter++iter2). 
    zipPartitions(tempIntermediateRDD5, true)((iter, iter2) => iter++iter2). 
    partitionBy(partitioner). 
    setName("tempIntermediateRDD6"). 
    persist(StorageLevel.MEMORY_AND_DISK_SER) 

tempIntermediateRDD6.checkpoint 

println(tempIntermediateRDD6.toDebugString) 

// (2568) tempIntermediateRDD6 ZippedPartitionsRDD2[169] at zipPartitions at mycode.scala:3203 [Disk Memory Serialized 1x Replicated] 
// | ZippedPartitionsRDD2[168] at zipPartitions at mycode.scala:3202 [Disk Memory Serialized 1x Replicated] 
// | tempIntermediateRDD1 ShuffledRDD[104] at partitionBy at mycode.scala:2824 [Disk Memory Serialized 1x Replicated] 
// |  CachedPartitions: 2568; MemorySize: 200.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B 
// | CheckpointRDD[105] at count at mycode.scala:2836 [Disk Memory Serialized 1x Replicated] 
// | tempIntermediateRDD2 ShuffledRDD[116] at partitionBy at mycode.scala:2900 [Disk Memory Serialized 1x Replicated] 
// | CheckpointRDD[117] at count at mycode.scala:2912 [Disk Memory Serialized 1x Replicated] 
// | tempIntermediateRDD5 MapPartitionsRDD[163] at distinct at mycode.scala:3102 [Disk Memory Serialized 1x Replicated] 
// |  CachedPartitions: 2568; MemorySize: 550.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B 
// | CheckpointRDD[164] at count at mycode.scala:3113 [Disk Memory Serialized 1x Replicated] 

與:

val tempIntermediateRDD6 = tempIntermediateRDD1. 
    union(tempIntermediateRDD2). 
    union(tempIntermediateRDD5). 
    partitionBy(partitioner). 
    setName("tempIntermediateRDD6"). 
    persist(StorageLevel.MEMORY_AND_DISK_SER) 

tempIntermediateRDD6.checkpoint 

println(tempIntermediateRDD6.toDebugString) 

// (2568) tempIntermediateRDD6 ShuffledRDD[170] at partitionBy at mycode.scala:3208 [Disk Memory Serialized 1x Replicated] 
// +-(5136) UnionRDD[169] at union at mycode.scala:3207 [Disk Memory Serialized 1x Replicated] 
//  | PartitionerAwareUnionRDD[168] at union at mycode.scala:3206 [Disk Memory Serialized 1x Replicated] 
//  | tempIntermediateRDD1 ShuffledRDD[104] at partitionBy at mycode.scala:2824 [Disk Memory Serialized 1x Replicated] 
//  |  CachedPartitions: 2568; MemorySize: 200.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B 
//  | CheckpointRDD[105] at count at mycode.scala:2836 [Disk Memory Serialized 1x Replicated] 
//  | tempIntermediateRDD2 ShuffledRDD[116] at partitionBy at mycode.scala:2900 [Disk Memory Serialized 1x Replicated] 
//  | CheckpointRDD[117] at count at mycode.scala:2912 [Disk Memory Serialized 1x Replicated] 
//  | tempIntermediateRDD5 MapPartitionsRDD[163] at distinct at mycode.scala:3102 [Disk Memory Serialized 1x Replicated] 
//  |  CachedPartitions: 2568; MemorySize: 550.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B 
//  | CheckpointRDD[164] at count at mycode.scala:3113 [Disk Memory Serialized 1x Replicated] 
+0

一個區別我已經看到的是,這些命令返回的RDD類型是不同的:工會()返回一個ShuffledRDD,而zipPartitions()返回一個ZippedPartitionsRDD2。我稍後在程序中注意到這些RDD類型的某些操作(例如reduceByKey)的差異,所以我想知道這些RDD類型有什麼區別。 –

+0

另請參見https://issues.apache。org/jira/browse/SPARK-10493的一些額外的背景和討論,爲什麼我問這個問題 –

回答

4

聯盟返回一個專門UnionRDD,我們可以看到它是怎麼寫的在Spark項目中查看UnionRDD.scala。看着它,我們可以看到,Union使用的代碼塊實際執行:

override def getPartitions: Array[Partition] = { 
    val array = new Array[Partition](rdds.map(_.partitions.length).sum) 
    var pos = 0 
    for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) { 
     array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index) 
     pos += 1 
    } 
    array 
    } 

如果你好奇,什麼底層的計算看起來像一個RDD我推薦使用上造成的RDDS功能toDebugString你可以看到依賴DAG的樣子。

+0

謝謝,這是部分有用的。我已經使用過toDebugString了,但是對於我的RDD來說,它並沒有吐出看到幕後發生的事情所需的詳細程度。我將在上面編輯我的問題,併發佈一個帶有結果的示例.DebugStrings –

+0

Yah toDebugString並不總是足夠的,但是您可以經常使用那個+來查看所引用的類,以瞭解發生了什麼。 – Holden

相關問題