2016-03-15 47 views
0

Spark(1.5.2)中的分區計數在某些sql查詢中爆炸。Spark sql查詢導致分區計數膨脹

這可以避免嗎?



在我來說,我有三個表上,我執行下面的查詢(文本,業主,人):

sqlContext.sql(
       "SELECT t.* FROM texts t JOIN ("+ 
         "SELECT o.TextId FROM "+ 
         "owners o JOIN ("+ 
         "SELECT UserId FROM person WHERE LOWER(name) "+ 
         "RLIKE '.*"+escapeRegex(filter.name)+"\\s*$'"+ 
         ") p ON (o.UserId=p.UserId) GROUP BY o.TextId"+ 
         ") o "+ 
         "ON (t.TextId = o.TextId)") 

查詢之前的分區數爲2,之後它的200使用textsDF.javaRDD().partitions().size()獲得

+0

嗨喬納森,你得到你在找什麼? – Srini

+0

Jonathan,默認爲200.請檢查此鏈接http://spark.apache.org/docs/latest/sql-programming-guide.html並搜索該屬性。 – Srini

+0

爲了避免默認值,你應該在你的代碼中設置你自己的屬性,因爲我建議降低值(到2或4)。然後你會得到較小的分區。 – Srini

回答

1

Join/Group或任何具有隨機操作的分區的數量取決於屬性「spark.sql.shuffle.partitions」。在羣集配置中,這必須已設置爲200。

該屬性的重要性:這決定了數據上reducer(種類,理解)操作的數量。通過將這個屬性設置得更高,可以確保有很好的並行性。

任何如何,您可以根據您的需要更改該屬性。您可以使用任何數字設置如下SparkConf。

conf.set("spark.sql.shuffle.partitions","2"); 

注意:將其設置爲較低會降低性能,這會增加網絡使用量和較少的並行性。

另一方面,文件讀取的並行性取決於默認的parallelism屬性,它告訴您hdfs數據中每個內核的任務數/塊的數量。但對於洗牌的任何操作,這取決於我提到的財產。

+0

'spark.sql.shuffle.partitions'不是由我的代碼設置的,也不是在環境中設置的。我使用'.setMaster(「local [4]」)'初始化我的配置。儘管如此,sqlContext.read()。json(...)只創建2個分區。 這就是爲什麼我在'DataFrame'上使用'.repartition(4)'。第一個查詢不改變分區計數。之後的第二個查詢將計數值更改爲200.在當前項目中,使用多個分區會導致可怕的性能(即使我在非本地運行),因爲每個分區都有很大的初始化開銷(在'.mapPartitions()' )。 – Jonathan

+0

Jonathan,默認爲200.請查看此鏈接http://spark.apache.org/docs/latest/sql-programming-guide.html並搜索該屬性。 – Srini

+0

是的,這是有效的。謝謝!我仍然對這種行爲感到很驚訝。我會期望它使用已經存在的線程/分區進行混洗,或者使用一個合理的系統相關常量,比如核心數量或setMaster(「local [。]」)中聲明的核心數量。 200似乎是一個非常武斷的選擇。 – Jonathan