2017-07-24 48 views
1

我只想找出兩個不同JSON文件中的女員工,只選擇我們感興趣的字段並將輸出寫入另一個JSON。在Google雲端平臺中使用數據流連接兩個json

另外我正試圖在Google的雲平臺上使用Dataflow來實現它。有人可以提供任何可以實現的Java代碼來獲得結果。

員工JSON

{"emp_id":"OrgEmp#1","emp_name":"Adam","emp_dept":"OrgDept#1","emp_country":"USA","emp_gender":"female","emp_birth_year":"1980","emp_salary":"$100000"} 
{"emp_id":"OrgEmp#1","emp_name":"Scott","emp_dept":"OrgDept#3","emp_country":"USA","emp_gender":"male","emp_birth_year":"1985","emp_salary":"$105000"} 

部JSON

{"dept_id":"OrgDept#1","dept_name":"Account","dept_start_year":"1950"} 
{"dept_id":"OrgDept#2","dept_name":"IT","dept_start_year":"1990"} 
{"dept_id":"OrgDept#3","dept_name":"HR","dept_start_year":"1950"} 

預期的輸出JSON文件應該像

{"emp_id":"OrgEmp#1","emp_name":"Adam","dept_name":"Account","emp_salary":"$100000"} 

回答

4

您可以使用CoGroupByKey(其中將使用shuffle)或使用副輸入(如果您的部門集合顯着更小)執行此操作。

我會給你Python中的代碼,但是你可以在Java中使用相同的管道。


同方的投入,你會:

  1. 將您的部門PCollection成映射 的dept_id該部門JSON字典詞典。

  2. 然後,您將012ol員工PCollection作爲主要輸入,您可以使用dept_id 獲取部門PCollection中每個部門的JSON。

像這樣:

departments = (p | LoadDepts() 
       | 'key_dept' >> beam.Map(lambda dept: (dept['dept_id'], dept))) 

deps_si = beam.pvalue.AsDict(departments) 

employees = (p | LoadEmps()) 

def join_emp_dept(employee, dept_dict): 
    return employee.update(dept_dict[employee['dept_id']]) 

joined_dicts = employees | beam.Map(join_dicts, dept_dict=deps_si) 

隨着CoGroupByKey,你可以使用的dept_id爲一鍵組中的兩個集合。這將導致關鍵值對的PCollection,其中關鍵是dept_id,該值是部門的兩個迭代器以及該部門中的員工。

​​
+1

請注意,使用側面輸入可能是更好的選擇,因爲巴勃羅提到部門集合可能比員工集合小。 –

+0

巴勃羅 - 感謝您的回覆。你能否提供一個你提到的步驟的班輪解釋。由於我在這方面是新手,所以一個小的解釋會有所幫助。 –

+0

我已經添加了一個解釋。另外,我移動了側面輸入解決方案,以便您首先考慮這一點。 – Pablo

相關問題