2016-11-16 75 views
0

我有兩個數據幀(tx_df和login_df)。 第一個包含player_id,tx_id和tx_time列,第二個包含player_id和login_time。PySpark按最近的時間值連接兩個數據幀

我想要做的就是使用player_id列加入這兩個數據框,但除此之外,只加入login_df中的最新登錄行。 例如,如果有tx_df這樣的:

pid_1, txid_1, '2016-11-16 00:01:00' 
pid_1, txid_2, '2016-11-16 00:01:02' 
pid_1, txid_3, '2016-11-16 00:02:15' 
pid_1, txid_4, '2016-11-16 00:02:16' 
pid_1, txid_5, '2016-11-16 00:02:17' 

和login_df這樣的:

pid_1, '2016-11-16 00:02:10' 
pid_1, '2016-11-16 00:00:55' 
pid_1, '2016-11-13 00:03:00' 
pid_1, '2016-11-10 16:30:00' 

我想要得到的數據幀,看起來像這樣:

pid_1, txid_1, '2016-11-16 00:01:00', pid_1, '2016-11-16 00:00:55' 
pid_1, txid_2, '2016-11-16 00:01:02', pid_1, '2016-11-16 00:00:55' 
pid_1, txid_3, '2016-11-16 00:02:15', pid_1, '2016-11-16 00:02:10' 
pid_1, txid_4, '2016-11-16 00:02:16', pid_1, '2016-11-16 00:02:10' 
pid_1, txid_5, '2016-11-16 00:02:17', pid_1, '2016-11-16 00:02:10' 

我不是強制綁定到數據框架,所以暗示瞭如何使用RDD或任何其他方法很好地完成它,將不勝感激。

爆炸的數據是我所害怕的,因爲tx_df可以爲每個玩家id(然後有數千個玩家ID)擁有數千個交易條目,而login_df可能也有未知數量的玩家登錄信息。只需加入player_id這兩個參數就可以創建一個巨大的數據框架,因爲笛卡爾積不可接受。

注意:我正在爲Spark使用Python API。

回答

0

爲了將來的參考,我設法用稍微不同的方法解決這個問題。 我很幸運,第二個數據幀足夠小,可以播放它。更確切地說,我廣播了值的hashmap,但這只是因爲我發現它適合於這個目的。 (見:broadcast variables in Spark

然後,我遍歷所述第一數據幀的行這樣

tx_df.rdd.map(my_map_function) 

和my_map_function我訪問廣播hasmap,沒需要排序和其它操作和最終選擇了哪些值我想追加到第一個數據幀的行。

作爲一個很好的副作用,廣播值的hashmap,我能夠刪除數據幀的連接並加快速度。 之前這樣做,腳本有

  • 將數據加載到數據幀
  • 加入數據幀到大的
  • 過濾掉大數據幀

的不需要行該廣播解決方案後,腳本有

  • 將數據加載到數據幀中
  • 迭代僅在第一個,直接訪問第二個的值,並將其附加到當前行中的第二種方法是不需要

過濾,因爲正確的價值觀都已經是第二個

  • 廣播值拿起來讓腳本執行速度更快。

  • 相關問題