我想用我是Spark Noob(剛剛在4天前開始閱讀一本書)前言這個問題。儘管如此,我正嘗試移植一些我在Python中的Pandas庫的幫助下編寫的內容,以便我可以利用我們剛剛創建的集羣。在大熊貓數據幀df
的數據是這樣的:迭代拋出DataFrame中的行並設置Spark中的值
+---------------------+-----------+-------+-------------+----------------------+
| TimeStamp | Customer | User | Application | TimeSinceApplication |
+---------------------+-----------+-------+-------------+----------------------+
| 2017-01-01 00:00:01 | customer1 | user1 | app1 | NaN |
| 2017-01-01 00:01:01 | customer1 | user2 | app2 | NaN |
| 2017-01-01 00:02:01 | customer1 | user1 | app2 | NaN |
| 2017-01-01 00:03:01 | customer1 | user1 | app1 | NaN |
+---------------------+-----------+-------+-------------+----------------------+
在Python中,我寫道:
unique_users = df.User.unique().tolist()
for j in range(0, len(unique_users):
user = unique_users[0]
access_events_for_user = df[df.User == user].copy()
indexes_for_access_events = access_events_for_user.index
applications_used = dict()
for i in range(0, len(access_events)):
current_access_event_ts = df.loc[current_auth_index].TimeStamp
if i == 0:
current_access_event_index = int(indexes_for_access_events[i])
df[current_access_event_index, 'TimeSinceApplicaiton'] = 2592000
continue
if df.loc[current_access_event_index].Application in applications_used:
time_since = current_access_event_ts - \
applications_used[df.loc[current_access_event_index].Application]).total_seconds()
df.loc[current_access_event_index, ’TimeSinceApplication] = time_since
applications_used[df.loc[current_access_event_index].Application] = current_access_event_ts
else:
df.loc[current_access_event_index, ’TimeSinceApplication] = 2592000
applications_used[df.loc[current_access_event_index].Application] = current_access_event_ts
它吐出來的是這樣的:
+---------------------+-----------+-------+-------------+----------------------+
| TimeStamp | Customer | User | Application | TimeSinceApplication |
+---------------------+-----------+-------+-------------+----------------------+
| 2017-01-01 00:00:01 | customer1 | user1 | app1 | 2592000 |
| 2017-01-01 00:01:01 | customer1 | user2 | app2 | 2592000 |
| 2017-01-01 00:02:01 | customer1 | user1 | app2 | 2592000 |
| 2017-01-01 00:03:01 | customer1 | user1 | app1 | 180 |
| | | | | |
+---------------------+-----------+-------+-------------+----------------------+
基本上,我試圖獲得用戶訪問應用程序以來的時間。如果這是用戶第一次訪問該應用程序,我將其設置爲30天的默認值。我們可以按客戶劃分數據,並按時間戳排序,以便按順序排列。我只是不確定如何做到這一點,而不需要在Spark中調用collect()
,就像here中的答案一樣,這會破壞Spark的目的。這甚至有可能嗎?
很好的解決方案。 @timchap你可以請閱讀我的解決方案,並告訴我,如果這是正確的或不。 –