2017-09-15 40 views
0

我有以下形式的數據幀:正比於如何(同樣)分區陣列的數據在火花數據幀

import scala.util.Random 
val localData = (1 to 100).map(i => (i,Seq.fill(Math.abs(Random.nextGaussian()*100).toInt)(Random.nextDouble))) 
val df = sc.parallelize(localData).toDF("id","data") 

|-- id: integer (nullable = false) 
|-- data: array (nullable = true) 
| |-- element: double (containsNull = false) 


df.withColumn("data_size",size($"data")).show 

+---+--------------------+---------+ 
| id|    data|data_size| 
+---+--------------------+---------+ 
| 1|[0.77845301260182...|  217| 
| 2|[0.28806915178410...|  202| 
| 3|[0.76304121847720...|  165| 
| 4|[0.57955190088558...|  9| 
| 5|[0.82134215959459...|  11| 
| 6|[0.42193739241567...|  57| 
| 7|[0.76381645621403...|  4| 
| 8|[0.56507523859466...|  93| 
| 9|[0.83541853717244...|  107| 
| 10|[0.77955626749231...|  111| 
| 11|[0.83721643562080...|  223| 
| 12|[0.30546029947285...|  116| 
| 13|[0.02705462199952...|  46| 
| 14|[0.46646815407673...|  41| 
| 15|[0.66312488908446...|  16| 
| 16|[0.72644646115640...|  166| 
| 17|[0.32210572380128...|  197| 
| 18|[0.66680355567329...|  61| 
| 19|[0.87055594653295...|  55| 
| 20|[0.96600507545438...|  89| 
+---+--------------------+---------+ 

現在我想應用昂貴的UDF,時間爲計算是〜數據數組的大小。我wodner如何重新分區我的數據,使每個分區具有大致相同數量的「records * data_size」(即數據點不僅僅是記錄)。

如果只是做df.repartition(100),我可能會得到一些包含一些非常大的數組的分區,這些數組就是整個spark階段的瓶頸(所有其他taks已經完成)。如果當然,我可以選擇一個瘋狂的分區數量,這將(幾乎)確保每個記錄都在一個單獨的分區中。但還有另一種方式嗎?

回答

0

正如你所說,你可以增加分區的數量。我通常使用核心數量的倍數:火花上下文默認並行度* 2-3 ..
對於您的情況,您可以使用更大的乘數。

另一種解決方案是將篩選以這種方式分割你的DF:

  • DF只有更大的陣列
  • DF,其餘

然後,您可以重新分區的每個人,執行計算並將它們聯合回來。

請注意,重新分區可能會很昂貴,因爲您需要大量的行來洗牌。

你可以看看論文的幻燈片(27+):https://www.slideshare.net/SparkSummit/custom-applications-with-sparks-rdd-spark-summit-east-talk-by-tejas-patil

他們正在經歷非常糟糕的數據偏差,不得不處理它以有趣的方式。

+0

在我的情況下,將數據幀分成小/大記錄的想法可能就足夠了。 –