2016-12-27 32 views
0

我使用spark1.6。我試圖廣播RDD,但不確定如何訪問數據幀中的廣播變量?如何在數據幀中引用廣播變量

我有兩個數據框員工&部門。

員工數據幀

------------------- 
Emp Id | Emp Name | Emp_Age 
------------------ 
1 | john | 25 

2 | David | 35 

部數據幀

-------------------- 
Dept Id | Dept Name | Emp Id 
----------------------------- 
1 | Admin | 1 

2 | HR | 2 

import scala.collection.Map 

val df_emp = hiveContext.sql("select * from emp") 

val df_dept = hiveContext.sql("select * from dept") 

val rdd = df_emp.rdd.map(row => (row.getInt(0),row.getString(1))) 

val lkp = rdd.collectAsMap() 

val bc = sc.broadcast(lkp) 

print(bc.value.get(1).get) 

--Below statement doesn't work 

val combinedDF = df_dept.withColumn("emp_name",bc.value.get($"emp_id").get) 
  1. 如何參照上述combinedDF聲明廣播變量?
  2. 如何處理如果lkp沒有返回任何值?
  3. 有沒有辦法從lkp中返回多個記錄(讓我們假設在查找時是否有2條記錄for emp_id = 1,我想要得到兩條記錄)
  4. 如何返回多個值廣播......(EMP_NAME & emp_age)

回答

0

如何參照上述combinedDF聲明廣播變量?

使用udf。如果emp_idInt

val f = udf((emp_id: Int) => bc.value.get(emp_id)) 

df_dept.withColumn("emp_name", f($"emp_id")) 

如何處理,如果LKP不返回任何價值?

如上圖所示

不要使用get有沒有辦法從LKP

使用groupByKey返回多個記錄:

val lkp = rdd.groupByKey.collectAsMap() 

explode

df_dept.withColumn("emp_name", f($"emp_id")).withColumn("emp_name", explode($"emp_name")) 

或者只是跳過所有的步驟和broadcast

import org.apache.spark.sql.functions._ 

df_emp.join(broadcast(df_dep), Seq("Emp Id"), "left") 
+0

感謝您的答覆.....從查找返回多個記錄,我做了以下.... VAL LKP = rdd.groupByKey.collectAsMap()..... val bc = sc.broadcast(lkp)...... val f = udf((emp_id:Int)=> bc.value.get(emp_id)).. ....我得到java.lang.UnsupportedOperationException。 – Prasan

+0

此外,如果我查找有多個列,如何從廣播返回多個值...(emp_name&emp_age) – Prasan

+0

udf調用中存在一個錯誤,該錯誤是固定的。對於多值,最好使用廣播連接(最後一部分)。 – user7337271

相關問題