2016-01-21 73 views
9

我已經創建了兩個來自Hive表(PC_ITM和ITEM_SELL)並且大小很大的數據幀,並且我經常在SQL查詢中使用那些 通過註冊爲表。但由於那些大,因此我花了很多時間獲取查詢結果。因此,我將它們保存爲parquet文件,然後將它們讀取並註冊爲臨時表。但仍然得不到良好性能,因此我已廣播了這些數據幀,然後註冊了它們如下表所示。如何訪問Spark中的廣播數據幀

PC_ITM_DF=sqlContext.parquetFile("path") 
val PC_ITM_BC=sc.broadcast(PC_ITM_DF) 
val PC_ITM_DF1=PC_ITM_BC 
PC_ITM_DF1.registerAsTempTable("PC_ITM") 

ITM_SELL_DF=sqlContext.parquetFile("path") 
val ITM_SELL_BC=sc.broadcast(ITM_SELL_DF) 
val ITM_SELL_DF1=ITM_SELL_BC.value 
ITM_SELL_DF1.registerAsTempTable(ITM_SELL) 


sqlContext.sql("JOIN Query").show 

但是我還是無法實現它與這些數據幀未被廣播的時間相同的性能。

誰能告訴如果這是廣播和使用它的正確方法?`

回答

0

我會緩存RDDS在內存中。下一次需要時,spark會從內存中讀取RDD,而不是每次都從頭開始生成RDD。這裏有一個快速入門的鏈接docs

val PC_ITM_DF = sqlContext.parquetFile("path") 
PC_ITM_DF.cache() 
PC_ITM_DF.registerAsTempTable("PC_ITM") 

val ITM_SELL_DF=sqlContext.parquetFile("path") 
ITM_SELL_DF.cache() 
ITM_SELL_DF.registerAsTempTable("ITM_SELL") 
sqlContext.sql("JOIN Query").show 

rdd.cache()是rdd.persist(StorageLevel.MEMORY_ONLY)的簡寫。有幾種持久性級別可以選擇,因爲您的數據太大而無法使用內存持久性。這裏是一個list of persistence options.如果你想從緩存中手動刪除RDD,你可以撥打rdd.unpersist()

如果您喜歡廣播數據。在播放之前,您必須先將其收集在驅動程序中。這要求您的RDD適合您的驅動程序(和執行程序)的內存。

+1

這不回答原來的問題,這就是如何廣播數據幀。如果您不止一次加載它,即重複使用,堅持只會有所幫助。加入兩個分佈式數據集時無助於此。 –

+0

@KirkBroadhurst他指出數據很大並且經常使用 –

+0

@AlexNaspo稍微偏離原始問題:'RDD適合內存',因此這意味着我無法播放數據,直到我可以將其收集到驅動程序的主內存中?我通常使用自己的筆記本電腦作爲大型集羣上的驅動程序和主設備/從設備。那麼這是我很快可能面臨的限制嗎? –

10

你並不需要'訪問'廣播數據幀 - 你只是使用它,而Spark將在後臺實現廣播。 broadcast function很好地工作,並且更有意義的是sc.broadcast方法。

如果您一次評估所有內容,可能很難理解在哪裏花費時間。

您可以將代碼分解爲多個步驟。這裏的關鍵是執行一個動作,並在您將用於連接之前繼續使用您想要廣播的數據幀

// load your dataframe 
PC_ITM_DF=sqlContext.parquetFile("path") 

// mark this dataframe to be stored in memory once evaluated 
PC_ITM_DF.persist() 

// mark this dataframe to be broadcast 
broadcast(PC_ITM_DF) 

// perform an action to force the evaluation 
PC_ITM_DF.count() 

這樣做將確保數據幀是

  • 加載到內存中(持續)
  • 註冊爲臨時表用於您的SQL查詢
  • 標記爲廣播,這樣會運送給所有執行者

當您現在運行sqlContext.sql("JOIN Query").show時,您應該看到'broadc ast hash join「放在Spark UI的SQL選項卡中。

+1

廣播RDD有什麼好處? RDD代表彈性分佈式數據集。廣播消除了RDD的分佈式特性。我可以看到將RDD的數據收集到內存中並進行廣播的用例。我不相信這是可能的。如果你看看這篇文章(http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love它說:「要廣播一個RDD,你需要首先在驅動程序節點上收集()它。」你有沒有在實踐或測試中使用過這個? –

+4

@AlexNaspo是的,我一直都在使用它。好處是數據在所有節點上完全可用 - 不再分配 - 這可以提高加入時的性能。例如,考慮一個DataFrame,其中包含美國的每個人及其郵政編碼,然後是包含郵政編碼 - >州的表。加入這些需要大量的洗牌。將相對較小的zip->狀態數據幀廣播到所有節點,無需洗牌。 –

+1

您正在廣播保存在內存中的數據幀,而不是分佈式的。那是對的嗎? Spark建議在您的數據中添加一個分區器以減少加入時的隨機數量。 @kirkbroadhurt –

0

此時您無法訪問SQL查詢中的廣播數據幀。您只能通過數據幀使用已展開的數據幀。

參見:https://issues.apache.org/jira/browse/SPARK-16475

+0

現在的解決方案是首先在dataframe api中廣播df或表,然後註冊從'broadcast'返回的值函數作爲臨時表,然後在SQL查詢中調用該臨時表。 – piggybox