2016-08-22 58 views
-2

在Apache中Saprk檢查比較兩個數據幀我有adataframe用以下結構從歷史使用Scala的

EmployeeDF

id name date  code 
    1 John 2015-4-14 C11 
    2 Roy 2011-5-20 C11 
    3 John 2010-5-20 C11 
    4 John 2012-5-20 C10 

不,我要檢查的歷史,如果相同的代碼是適用於同一員工兩年前。我怎樣才能做到這一點。這只是樣本數據,我在數據框中擁有數百萬的數據,並且我想實現性能。加入數據框會降低性能,因爲行是重複的,所以我使用笛卡爾並在自加入過程中複製行。我想用地圖等東西來實現。

編輯:當前的代碼

在第一步中,我得到那些誰重複超過一次,因爲我們正在檢查的歷史,如果有的員工只存在員工(從OP的留言中加入。)一旦這意味着這個員工沒有歷史。因此,對於這一步的代碼是:

val uniqueEmpDF = SparkConfig 
    .sc 
    .sqlContext 
    .sql("SELECT *, '1' as level FROM cpeFirstStep WHERE e_id IN(SELECT e_id FROM cpeFirstStep where code = 'C11' " + " GROUP BY e_id HAVING COUNT(e_id)=1)") 
    .cache() 

第二步是讓誰被重複,而代碼是這樣的員工:

val repeatedEmpDF = SparkConfig 
    .sc 
    .sqlContext 
    .sql("SELECT *, '2' as level FROM cpeFirstStep WHERE e_id IN(SELECT e_id FROM cpeFirstStep where code = 'C11' " + " GROUP BY e_id HAVING COUNT(e_id)>1)") 
    .cache() 

現在主要步驟如下:

val historyJoin = SparkConfig 
    .sc 
    .sql("SELECT x.*, CASE WHEN y.code = x.code THEN '3' ELSE '4' END level FROM repeatedEmptDF X " + "LEFT JOIN repeatedEmptDF Y ON y.e_id = x.e_id AND y.code = x.code " + "AND y.date < x.data - INTERVAL 2 YEAR") 
+0

2年前? 2年前的含義是什麼?你可以在''code「和」id「'上分組,然後檢查日期條件。 –

+0

請提供您所寫的代碼。它會讓我們更容易幫助你。 –

+0

@ Sarvesh Kumar Singh我想檢查每位員工的歷史,如果兩年或多年前同一員工適用同一代碼,則將此行標記爲1級,並在其他所有情況下將此行標記爲2級。 –

回答

1

所以,寫這個有很多不同的方法,但假設我已經正確理解你的例子,下面的spark代碼就可以實現。請注意,我已爲您提供的樣本添加了一些額外數據,並且我還假定該員工約翰應該擁有相同的ID。所以我的測試輸入是這樣的:

import org.joda.time.LocalDate 
val df = sc.parallelize(List((1, "John", new LocalDate(2015,4,14), "C11"),(2, "Roy", new LocalDate(2011,5,20), "C11"),(1, "John", new LocalDate(2010,5,20), "C11"),(1, "John", new LocalDate(2012,5,20), "C10"),(1, "John", new LocalDate(2013,1,14), "C11"))) 

那麼對於員工的實際識別那些有同樣code爲至少2年:

df.map{case (id: Int, name: String, date: LocalDate, code: String) => ((id, name), List((date, code)))} 
    .reduceByKey(_++_) 
    .filter{case(_, listOfCodes) => listOfCodes.length >= 2} // Not interested in employees with only one code registered 
    .flatMapValues(list => { 
    def sameCodeForTwoYears(list: List[(LocalDate, String)]): List[(LocalDate, String)] = { 
     list match { 
     case x :: Nil => List.empty 
     case x :: xs => if (xs.head._1.minusYears(2).isAfter(x._1) && x._2 == xs.head._2) { 
      List(x, xs.head) 
     } else sameCodeForTwoYears(xs) 
     case Nil => List.empty 
     } 
    } 
    sameCodeForTwoYears(list.sortWith((left, right) => left._1.isBefore(right._1)))}) 
    .map{case((id, name),(date, code)) => (id, name, date, code)} 

這將輸出:

(1,John,2013-01-14,C11)               
(1,John,2015-04-14,C11) 

這是你在找什麼?

我不知道你會在你的數據集上得到什麼樣的表現,但希望你能得到這個如何寫在Spark中的一般想法。

+0

thanx。你引導我走向正確的道路。現在我們有RDD [(Int,String,LocalDate,String)],並且如果我有RDD [Row],我們通過case類映射它,那麼我怎麼能映射它。我知道這是非常基本的,但我是新手。 –

+0

很高興爲您提供幫助。如果答案是你正在尋找的,你能否接受它。這樣,我得到了幫助,並且問題將從「未回答的問題」列表中消失:-) –