2015-04-21 40 views
6

我使用pyspark(Apache的星火)的數據幀API和正在運行到以下問題:來自同一來源的加入兩個DataFrames

當我連接兩個DataFrames從同一個源數據幀,所產生的DF起源會爆炸成很多行。一個簡單的例子:

我與n行從磁盤加載數據幀:

df = sql_context.parquetFile('data.parquet') 

然後,我創建從該源的兩個DataFrames。

df_one = df.select('col1', 'col2') 
df_two = df.select('col1', 'col3') 

最後,我想(內)加入他們一起回來:

df_joined = df_one.join(df_two, df_one['col1'] == df_two['col1'], 'inner') 

的關鍵在於col1是獨一無二的。生成的DataFrame應該有n行,但它確實有n*n行。

這不會發生,當我直接從磁盤加載df_onedf_two。我在Spark 1.3.0上,但是這也發生在當前的1.4.0快照上。

任何人都可以解釋爲什麼會發生?

+1

對不起,你想'df_one.merge(df_two,left_on ='col1',right_on ='col2',how ='inner')'? – EdChum

+1

@EdChum對不起,我忘了提及我正在使用Apache Spark並編輯了這個問題來反映這一點。我擔心Spark DataFrame上沒有'merge'這樣的東西。 – karlson

+0

好的以爲這可能是一個熊貓的問題 – EdChum

回答

0

我在Spark 1.3上也看到了我的大數據集中的這個問題。不幸的是,在我編寫的「小人物」中,人爲設計的例子正確運作。我覺得有從臺階前述的加盟或許

執行加入了一些潛在的bug(注:日期時間只是一個字符串):

> join = df1.join(df2, df1.DateTime == df2.DateTime, "inner") 
> join.count() 

250000L 

這顯然返回整個500個* 500笛卡兒連接。

什麼是對我的工作切換到SQL:

> sqlc.registerDataFrameAsTable(df1, "df1") 
    > sqlc.registerDataFrameAsTable(df2, "df2") 
    > join = sqlc.sql("select * from df1, df2 where df1.DateTime = df2.DateTime") 
    > join.count() 
    471L 

該數值看起來正確。

看到這個,我個人不會使用pyspark的DataFrame.join(),直到我能更好地理解這個差異。

3

如果我正確地讀這篇文章,df_two沒有COL2

df_one = df.select('col1', 'col2') 
    df_two = df.select('col1', 'col3') 

所以,當你這樣做:

df_one.join(df_two, df_one['col1'] == df_two['col2'], 'inner') 

這應該失敗。如果你打算說

df_one.join(df_two, df_one['col1'] == df_two['col1'], 'inner') 

但是,你從同一個數據幀加載的事實應該沒有影響。我建議你這樣做:

df_one.show() 
    df_two.show() 

確保您選擇的數據是您的預期。

+0

我的意思是說'df_one ['col1'] == df_two ['col1']'確實如此。我在問題中解決了這個錯字。 – karlson

+0

在這種情況下,請執行以下測試: df_joined.orderBy(['col1'])。show(10) 如果存在重複行,則我不確定,我想可能是由於您使用了舊火花的版本。如果每行不同,那麼col1並不是真正唯一的。從我的嘗試中,使用spark 1.5.2這不是一個問題,並且這個連接不會創建多個迭代。 – user3124181