2017-10-16 81 views
0

說,我有some_col加入兩個具有相同分區的DataFrame後,哪些屬性具有Spark DataFrame?

df_1 = df_1.repartition(50, 'some_col') 
df_2 = df_2.repartition(50, 'some_col') 

df_3 = df_1.join(df_2, on='some_col') 

2個星火DataFrames我認爲df_3應該由some_col也劃分,有50個分區,但我的實驗表明,至少在過去的條件是不正確的。爲什麼會發生?

在耗時的操作(重新分區或重新定位)的條款,會有什麼後

df_3 = df_3.repartition(50, 'some_col') 
+0

爲什麼你認爲'df_3'應該有50個分區?在加入後什麼說'df_3.rdd.getNumPartitions()'? – Mariusz

+0

我發現'df_3.rdd.getNumPartitions()'等於'spark.default.parallelism'。它看起來很奇怪..爲什麼會發生?在同一分區內進行連接看起來要容易得多! – AlexanderLedovsky

+0

它總是等於'spark.sql.shuffle.partitions',你可以在這裏閱讀更多:https://stackoverflow.com/questions/41359344/why-is-the-number-of-partitions-after-groupby-200 - 爲什麼這200個沒有 - 其他 - – Mariusz

回答

2

條件「df_3應該由some_col也劃分,有50個分區」可能發生只會是如果df_1和df_2具有與「some_col」相同值的分區,即如果df_1有2個分區:[(1,2)],[(3,1),(3,7)](例如some_col值爲1,3),那麼df_2需要具有some_col值爲1,3的分區。如果是這種情況,那麼在加入df_1和df_2時,它將生成與df_1或df_2中分區數相同的df_3。

在所有其他情況下,它將嘗試創建一個默認的200分區並對整個聯接操作進行隨機播放。

爲清楚起見,你可以試試下面的例子:

rdd1 = sc.parallelize([(1,2), (1,9), (2, 3), (3,4)]) 
df1 = rdd1.toDF(['a', 'b']) 
df1 = df1.repartition(3, 'a') 
df1.rdd.glom().collect() #outputs like: 
>> [[Row(a=2,b=3)], [Row(a=3,b=4)], [Row(a=1,b=2), Row(a=1,b=9)]] 

df1.rdd.getNumPartitions() 
>>3 

rdd2 = sc.parallelize([(1,21), (1,91), (2, 31), (3,41)]) 
df2 = rdd2.toDF(['a', 'b']) 
df2 = df2.repartition(3, 'a') 
df2.rdd.glom().collect() #outputs like: 
>> [[Row(a=2,b=31)], [Row(a=3,b=41)], [Row(a=1,b=21), Row(a=1,b=91)]] 

df2.rdd.getNumPartitions() 
>>3 


df3 = df1.join(df2, on='a') 
df3.rdd.glom().collect() #outputs like: 
>> [[Row(a=2,b=3,b=31)], [Row(a=3,b=4,b=41)], [Row(a=1,b=2,b=21), Row(a=1,b=9,b=91)]] 
df21.rdd.getNumPartitions() 
>>3 
+0

Thx!其實現在更清楚了。我是對的,如果df1和df2中的some_col的值不同(例如[1,3]和[1,2,3]),它總是會導致洗牌? – AlexanderLedovsky

相關問題