2017-06-13 42 views
0

我使用Window.sum函數來獲取RDD中的值的總和,但是當我將DataFrame轉換爲RDD時,我發現結果只有一個分區。重新分區何時發生? ?當我在窗口中使用partitionBy時,爲什麼我用spark/scala得到不同的結果?

val rdd = sc.parallelize(List(1,3,2,4,5,6,7,8), 4) 
    val df = rdd.toDF("values"). 
     withColumn("csum", sum(col("values")).over(Window.orderBy("values"))) 
    df.show() 
    println(s"numPartitions ${df.rdd.getNumPartitions}") 
    // 1 
    //df is: 
// +------+----+ 
// |values|csum| 
// +------+----+ 
// |  1| 1| 
// |  2| 3| 
// |  3| 6| 
// |  4| 10| 
// |  5| 15| 
// |  6| 21| 
// |  7| 28| 
// |  8| 36| 
// +------+----+ 

我添加partitionBy在窗口,但結果是錯誤,我應該怎麼做,這是我改變代碼:

 val rdd=sc.parallelize(List(1,3,2,4,5,6,7,8),4) 
     val sqlContext = new SQLContext(m_sparkCtx) 
     import sqlContext.implicits._ 
     val df = rdd.toDF("values").withColumn("csum", sum(col("values")).over(Window.partitionBy("values").orderBy("values"))) 
     df.show() 
     println(s"numPartitions ${df.rdd.getNumPartitions}") 
     //1 
//df is: 
// +------+----+ 
// |values|csum| 
// +------+----+ 
// |  1| 1| 
// |  6| 6| 
// |  3| 3| 
// |  5| 5| 
// |  4| 4| 
// |  8| 8| 
// |  7| 7| 
// |  2| 2| 
// +------+----+ 
+0

我剛剛在你的其他問題中回答了這個問題。 :) –

回答

0

Window功能有partitionBy API用於分組的dataframeorderBy訂購按升序或降序分組的行。

在您的第一個案例中,您尚未定義partitionBy,因此所有值都歸入一個dataframe以進行排序,從而將數據混合到一個分區中。

但在第二種情況下,您自己在values上定義了partitionBy。因此,由於每個值都是不同的,因此每一行都被分組爲單個組。

在第二種情況下的分區是200,因爲這是當你還沒有定義分區和洗牌發生

要從第二種情況下得到相同的結果與第一個情況下,應該spark定義的默認分區,您需要將您的dataframe與第一種情況一樣分組到一個組中。爲此,您需要創建另一個具有常數值的column,並將該值用於partitionBy

0

當創建列作爲
withColumn("csum", sum(col("values")).over(Window.orderBy("values")))

因爲還沒有定義partitionBy()方法來定義分區的Window.orderBy("values")被排序在單個分區列「值」的值。

這是從初始4改變partition數爲1

分區200是在第二個情況下,由於partitionBy()方法使用200默認分區。如果你需要的分區數爲4,你可以使用像repartition(4)coalesce(4)

方法希望你明白了!

相關問題