2014-10-06 80 views
12

我只想保留在第二個表中引用了部門ID的員工。基於Spark中的另一個RDD進行過濾

Employee table 
LastName DepartmentID 
Rafferty 31 
Jones 33 
Heisenberg 33 
Robinson 34 
Smith 34 

Department table 
DepartmentID 
31 
33 

我曾嘗試下面的代碼不工作:

employee = [['Raffery',31], ['Jones',33], ['Heisenberg',33], ['Robinson',34], ['Smith',34]] 
department = [31,33] 
employee = sc.parallelize(employee) 
department = sc.parallelize(department) 
employee.filter(lambda e: e[1] in department).collect() 

Py4JError: An error occurred while calling o344.__getnewargs__. Trace: 
py4j.Py4JException: Method __getnewargs__([]) does not exist 

任何想法?我在Python中使用Spark 1.1.0。但是,我會接受一個Scala或Python的答案。

+0

你需要你的部門名單是一個「中」 RDD? – maasg 2014-10-06 18:01:43

+0

不是。部門列表從HDFS加載,但不是很大。 – poiuytrez 2014-10-07 07:52:07

回答

19

在這種情況下,你想實現的是在與包含在部門表中的數據的每個分區過濾: 這將是基本的解決方案:

val dept = deptRdd.collect.toSet 
val employeesWithValidDeptRdd = employeesRdd.filter{case (employee, d) => dept.contains(d)} 

如果你的部門的數據是大的,廣播的變量將會被一次傳送數據到所有節點,而不必與每個任務

val deptBC = sc.broadcast(deptRdd.collect.toSet) 
val employeesWithValidDeptRdd = employeesRdd.filter{case (employee, d) => deptBC.value.contains(d)} 
0序列化提高性能

雖然使用連接可以工作,但這是一個非常昂貴的解決方案,因爲它需要分佈式數據的洗牌(byKey)來實現連接。鑑於該要求是一個簡單的過濾器,將數據發送到每個分區(如上所示)將提供更好的性能。

+0

赦免我如果我在這裏錯了,但不會partitionBy()通過鍵解決分佈式洗牌?並不是說它會解決加入問題更昂貴的問題,因爲我不這麼認爲,我只是指出加入並不需要100%的時間進行洗牌。 – TurnipEntropy 2018-02-19 02:42:08

10

我終於實現了一個使用連接的解決方案。我有一個0值添加到部門,以避免星火異常:

employee = [['Raffery',31], ['Jones',33], ['Heisenberg',33], ['Robinson',34], ['Smith',34]] 
department = [31,33] 
# invert id and name to get id as the key 
employee = sc.parallelize(employee).map(lambda e: (e[1],e[0])) 
# add a 0 value to avoid an exception 
department = sc.parallelize(department).map(lambda d: (d,0)) 

employee.join(department).map(lambda e: (e[1][0], e[0])).collect() 

output: [('Jones', 33), ('Heisenberg', 33), ('Raffery', 31)] 
0

過濾多列多個值:

在這種情況下,你是從數據庫中提取數據(蜂巢或者在這個例子中SQL類型的數據庫),並需要在多個列的過濾,則可能是更容易裝載表與第一過濾器,然後通過RDD迭代您的過濾器(多個小迭代是星火編程的鼓勵方式):

{ 
    import org.apache.spark.sql.hive.HiveContext 
    val hc = new HiveContext(sc) 

    val first_data_filter = hc.sql("SELECT col1,col2,col2 FROM tableName WHERE col3 IN ('value_1', 'value_2', 'value_3)") 
    val second_data_filter = first_data_filter.filter(rdd => rdd(1) == "50" || rdd(1) == "20") 
    val final_filtered_data = second_data_filter.filter(rdd => rdd(0) == "1500") 

} 

當然,你必須知道你的數據一點點地過濾對正確的值,但這是分析過程的一部分。

0

對於上面的同一個exm,我只想保留包含或在第二個表中引用的部門ID的僱員。 但它必須沒有連接操作,我會看到它在「包含」或「」, 我的意思是33的334和335

employee = [['Raffery',311], ['Jones',334], ['Heisenberg',335], ['Robinson',34], ['Smith',34]] 
department = [31,33] 
employee = sc.parallelize(employee) 
department = sc.parallelize(department) 
相關問題