2016-04-29 14 views
0

我想過濾和加入做一個簡單的透視,但得到非常奇怪的結果。Spark sql:如何過濾兩次數據幀,然後連接在一起?

val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
import sqlContext.implicits._ 

val people = Array((1, "sam"), (2, "joe"), (3, "sally"), (4, "joanna")) 
val accounts = Array(
    (1, "checking", 100.0), 
    (1, "savings", 300.0), 
    (2, "savings", 1000.0), 
    (3, "carloan", 12000.0), 
    (3, "checking", 400.0) 
) 

val t1 = sc.makeRDD(people).toDF("uid", "name") 
val t2 = sc.makeRDD(accounts).toDF("uid", "type", "amount") 

val t2c = t2.filter(t2("type") <=> "checking") 
val t2s = t2.filter(t2("type") <=> "savings") 

t1. 
    join(t2c, t1("uid") <=> t2c("uid"), "left"). 
    join(t2s, t1("uid") <=> t2s("uid"), "left"). 
    take(10) 

的結果是錯誤的:

Array(
    [1,sam,1,checking,100.0,1,savings,300.0], 
    [1,sam,1,checking,100.0,2,savings,1000.0], 
    [2,joe,null,null,null,null,null,null], 
    [3,sally,3,checking,400.0,1,savings,300.0], 
    [3,sally,3,checking,400.0,2,savings,1000.0], 
    [4,joanna,null,null,null,null,null,null] 
) 

我可以迫使它正常工作的方法是爲每一個過濾器一個新的DF:

val t2a = sc.makeRDD(accounts).toDF("uid", "type", "amount") 
val t2s = t2a.filter(t2a("type") <=> "savings") 

t1. 
    join(t2c, t1("uid") <=> t2c("uid"), "left"). 
    join(t2s, t1("uid") <=> t2s("uid"), "left"). 
    take(10) 

的結果是正確的:

Array(
    [1,sam,1,checking,100.0,1,savings,300.0], 
    [2,joe,null,null,null,2,savings,1000.0], 
    [3,sally,3,checking,400.0,null,null,null], 
    [4,joanna,null,null,null,null,null,null] 
) 

Thi s解決方案不可行,那麼有沒有更好的方法?

+0

你願意解釋你的解決方案是不是可行的? – eliasah

回答

相關問題