2017-10-04 18 views
2

我在Apache Spark上運行「連接」操作,看到沒有弱的可伸縮性。如果有人能解釋這一點,將不勝感激。我創建了兩個數據框(「a」,「b」)和(「a」,「c」),並通過第一列連接數據框。我爲「一對一」連接生成數據幀值。另外,我使用相同的分區程序來避免混洗。Apache Spark連接操作的弱擴展性差

數據框中的行數 - 1024 * 1024 * 16 * cores_total(cores_total - 啓動程序的核心總數)。 列「a」由隨機Int值組成,所有「b」列的值等於1,「c」列的所有值等於2.

理論上,隨着數據大小和內核到64倍,執行時間應該保持不變,但執行時間稍微增加。我得到下面的執行時間:

enter image description here

Apache的版本星火 - 2.1.0。我們使用8個集羣節點,配備1 Gbit以太網,每個節點有2個Intel Xeon E5-2630,64 GB RAM。

/* join perf */ 
import scala.io.Source 
import scala.math._ 
import org.apache.spark._ 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf 
import scala.util.control.Breaks._ 
import scala.collection.mutable._ 
import org.apache.spark.rdd._ 
import org.apache.spark.sql._ 
import scala.util.Random 
import org.apache.spark.util.SizeEstimator 
import org.apache.spark.HashPartitioner 

object joinPerf { 

    def get_array(n: Int): Array[Int] = { 
     var res = Array[Int]() 
     for (x <- 1 to n) { 
      res :+= Random.nextInt 
     } 

     return res 
    } 

    def main(args: Array[String]) { 
     val start_time = System.nanoTime 
     val conf = new SparkConf().setAppName("joinPerf") 
     val sc = new SparkContext(conf) 
     val cores_total = sc.getConf.get("spark.cores.max").toInt 
     val partitions_total = sc.getConf.get("spark.default.parallelism").toInt 
     val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
     import sqlContext._ 
     import sqlContext.implicits._ 
     println("start") 
     val elems_total = 1024 * 1024 * 16 * cores_total 
     val start_cnt = 1024 * 1024 
     Random.setSeed(785354) 

     var vals = Vector[Int]() 
     for (x <- 1 to start_cnt) { 
      vals :+= Random.nextInt 
     } 

     var test_rdd = sc.parallelize(vals) 
     println(test_rdd.count) 
     test_rdd = test_rdd.flatMap(x => get_array(elems_total/start_cnt)).distinct 

     println("test_rdd count = " + test_rdd.count) 
     println("partitions count = " + test_rdd.getNumPartitions) 

     var test_rdd1 = test_rdd.map(x => (x, 1)).toDF("a", "b").repartition(partitions_total, $"a").cache 
     var test_rdd2 = test_rdd.map(x => (x, 2)).toDF("a", "c").repartition(partitions_total, $"a").cache 

     println("test_rdd1 count = " + test_rdd1.count) 
     println("test_rdd2 count = " + test_rdd2.count) 

     var start_test_time = System.nanoTime 
     var test_res = test_rdd1.join(test_rdd2, test_rdd1("a") === test_rdd2("a")) 
     println(test_res.count) 
     print("join time = ") 
     println((System.nanoTime - start_test_time)/1e9d + " sec. ") 

     print("all time = ") 
     println((System.nanoTime - start_time)/1e9d + " sec. ") 
     sc.stop() 
    } 
} 

配置參數:每芯

spark.serializer     org.apache.spark.serializer.KryoSerializer 
spark.kryoserializer.buffer.max 1024 
spark.kryo.unsafe    true 
spark.kryo.referenceTracking  false 
spark.driver.memory    22g 
spark.executor.memory   22g 
spark.driver.maxResultSize  22g 
spark.rpc.message.maxSize  2047 
spark.memory.fraction   0.8 
spark.memory.storageFraction  0.5 
spark.executor.extraJavaOptions "-XX:+UseParallelGC" 

分區 - 4.

啓動程序的實施例:

./bin/spark-submit --class "joinPerf" --conf spark.executor.cores=8 --conf spark.cores.max=64 --conf spark.default.parallelism=256 ./joinPerf.jar 

回答

1

理論上,與數據尺寸的增加核心64倍,執行時間應該是rem ain相同,但執行時間略有增長

它不應該。雖然人們可以預期線性可伸縮性(假設沒有IO瓶頸),但在嚴格執行均勻分佈數據的本地操作時,當轉換需要數據交換時(RDD洗牌,DatasetExchange),情況就不會如此。在廣泛的轉型中,joins屬於最昂貴的類別(接下來是groupByKey類似的業務),由於它們的非還原性質以及大型本地支持集合的使用。

隨機洗牌不僅具有高於線性複雜度(至少爲基於排序的方法),還會導致數據分佈不均勻,並且需要大量的磁盤和網絡IO。

這是即使在你的代碼,它慢騰騰兩次數據的情況下,更嚴重的 - 一旦重新分區RDDs而一旦joinDatasetsHashPartitionerRDDs不兼容Dataset分區)。

最後,增加簇大小有其自身的性能影響,與通信和同步開銷增加以及數據局部性降低有關。

總的來說,您很少會看到真正的線性可伸縮性,即使您這樣做,您也可以預計斜率爲< 1。

在附註上,我不會依賴cache - countDatasets一起使用時的習慣用法。 It is likely to be unreliable。請參閱Spark: Inconsistent performance number in scaling number of cores