2016-09-27 52 views
0

我有一個Spark數據框,看起來像這樣:數據幀的轉換與嵌套結構

root 
|-- employeeName: string (nullable = true) 
|-- employeeId: string (nullable = true) 
|-- employeeEmail: string (nullable = true) 
|-- company: struct (nullable = true) 
| |-- companyName: string (nullable = true) 
| |-- companyId: string (nullable = true) 
| |-- details: struct (nullable = true) 
| | |-- founded: string (nullable = true) 
| | |-- address: string (nullable = true) 
| | |-- industry: string (nullable = true) 

我想要做的就是按companyId,每位公司員工組成的數組,像這樣:

root 
|-- company: struct (nullable = true) 
| |-- companyName: string (nullable = true) 
| |-- companyId: string (nullable = true) 
| |-- details: struct (nullable = true) 
| | |-- founded: string (nullable = true) 
| | |-- address: string (nullable = true) 
| | |-- industry: string (nullable = true) 
|-- employees: array (nullable = true)  
| |-- employee: struct (nullable = true)   
| | |-- employeeName: string (nullable = true) 
| | |-- employeeId: string (nullable = true) 
| | |-- employeeEmail: string (nullable = true) 

當然,如果我只有一對(公司,員工):(字符串,字符串)使用map和reduceByKey,我可以很容易地做到這一點。但是,對於所有不同的嵌套信息,我不確定要採取什麼方法。

我應該嘗試平整一切嗎?任何做類似事情的例子都會很有幫助。

回答

1

你可以做以下 - 在

// declaring data types 
case class Company(cName: String, cId: String, details: String) 
case class Employee(name: String, id: String, email: String, company: Company) 

// setting up example data 
val e1 = Employee("n1", "1", "[email protected]", Company("c1", "1", "d1")) 
val e2 = Employee("n2", "2", "[email protected]", Company("c1", "1", "d1")) 
val e3 = Employee("n3", "3", "[email protected]", Company("c1", "1", "d1")) 
val e4 = Employee("n4", "4", "[email protected]", Company("c2", "2", "d2")) 
val e5 = Employee("n5", "5", "[email protected]", Company("c2", "2", "d2")) 
val e6 = Employee("n6", "6", "[email protected]", Company("c2", "2", "d2")) 
val e7 = Employee("n7", "7", "[email protected]", Company("c3", "3", "d3")) 
val e8 = Employee("n8", "8", "[email protected]", Company("c3", "3", "d3")) 
val employees = Seq(e1, e2, e3, e4, e5, e6, e7, e8) 
val ds = sc.parallelize(employees).toDS 

// actual query to achieve what is mentioned in the question 
val result = ds.groupByKey(e => e.company).mapGroups((k, itr) => (k, itr.toList)) 
result.collect 

結果:

Array(

(Company(c1,1,d1),WrappedArray(Employee(n1,1,[email protected],Company(c1,1,d1)), Employee(n2,2,[email protected],Company(c1,1,d1)), Employee(n3,3,[email protected],Company(c1,1,d1)))), 

(Company(c2,2,d2),WrappedArray(Employee(n4,4,[email protected],Company(c2,2,d2)), Employee(n5,5,[email protected],Company(c2,2,d2)), Employee(n6,6,[email protected],Company(c2,2,d2)))), 

(Company(c3,3,d3),WrappedArray(Employee(n7,7,[email protected],Company(c3,3,d3)), Employee(n8,8,[email protected],Company(c3,3,d3))))) 

最重要的是:你可以通過你想在mapGroups獲得的方式組的任何功能你要。

希望這會有所幫助。

+0

謝謝,我設法以類似的方式解決它。 – Dmitri