2017-07-17 71 views
0

我想用我是Spark Noob(剛剛在4天前開始閱讀一本書)前言這個問題。儘管如此,我正嘗試移植一些我在Python中的Pandas庫的幫助下編寫的內容,以便我可以利用我們剛剛創建的集羣。在大熊貓數據幀df的數據是這樣的:迭代拋出DataFrame中的行並設置Spark中的值

+---------------------+-----------+-------+-------------+----------------------+ 
|  TimeStamp  | Customer | User | Application | TimeSinceApplication | 
+---------------------+-----------+-------+-------------+----------------------+ 
| 2017-01-01 00:00:01 | customer1 | user1 | app1  |    NaN | 
| 2017-01-01 00:01:01 | customer1 | user2 | app2  |    NaN | 
| 2017-01-01 00:02:01 | customer1 | user1 | app2  |    NaN | 
| 2017-01-01 00:03:01 | customer1 | user1 | app1  |    NaN | 
+---------------------+-----------+-------+-------------+----------------------+ 

在Python中,我寫道:

unique_users = df.User.unique().tolist() 
for j in range(0, len(unique_users): 
    user = unique_users[0] 
    access_events_for_user = df[df.User == user].copy() 
    indexes_for_access_events = access_events_for_user.index 
    applications_used = dict() 
    for i in range(0, len(access_events)): 
     current_access_event_ts = df.loc[current_auth_index].TimeStamp 
     if i == 0: 
      current_access_event_index = int(indexes_for_access_events[i]) 
      df[current_access_event_index, 'TimeSinceApplicaiton'] = 2592000 
      continue 
    if df.loc[current_access_event_index].Application in applications_used: 
     time_since = current_access_event_ts - \ 
      applications_used[df.loc[current_access_event_index].Application]).total_seconds() 
     df.loc[current_access_event_index, ’TimeSinceApplication] = time_since 
     applications_used[df.loc[current_access_event_index].Application] = current_access_event_ts 
    else: 
     df.loc[current_access_event_index, ’TimeSinceApplication] = 2592000 
     applications_used[df.loc[current_access_event_index].Application] = current_access_event_ts 

它吐出來的是這樣的:

+---------------------+-----------+-------+-------------+----------------------+ 
|  TimeStamp  | Customer | User | Application | TimeSinceApplication | 
+---------------------+-----------+-------+-------------+----------------------+ 
| 2017-01-01 00:00:01 | customer1 | user1 | app1  |    2592000 | 
| 2017-01-01 00:01:01 | customer1 | user2 | app2  |    2592000 | 
| 2017-01-01 00:02:01 | customer1 | user1 | app2  |    2592000 | 
| 2017-01-01 00:03:01 | customer1 | user1 | app1  |     180 | 
|      |   |  |    |      | 
+---------------------+-----------+-------+-------------+----------------------+ 

基本上,我試圖獲得用戶訪問應用程序以來的時間。如果這是用戶第一次訪問該應用程序,我將其設置爲30天的默認值。我們可以按客戶劃分數據,並按時間戳排序,以便按順序排列。我只是不確定如何做到這一點,而不需要在Spark中調用collect(),就像here中的答案一樣,這會破壞Spark的目的。這甚至有可能嗎?

回答

1

這正在接近DataFrame API的複雜性極限。其他人可能會提出一種使用DataFrames的方法,但是我個人認爲RDD API更適合於此。下面是一個例子給你如何構建算法星火一個想法:

data = [(datetime(2017, 1, 1, 0, 0, 1), 'customer1', 'user1', 'app1'), 
     (datetime(2017, 1, 1, 0, 1, 1), 'customer1', 'user2', 'app2'), 
     (datetime(2017, 1, 1, 0, 2, 1), 'customer1', 'user1', 'app2'), 
     (datetime(2017, 1, 1, 0, 3, 1), 'customer1', 'user1', 'app1')] 

rdd = sc.parallelize(data) 

def toTimeSince(row): 
    cust_user_app, timestamps = row 
    timestamps = sorted(timestamps) 
    result = [(timestamps[0], *cust_user_app, timedelta(30))] 
    previous_timestamp = timestamps[0] 
    for timestamp in sorted(timestamps)[1:]: 
     result.append((timestamp, *cust_user_app, timestamp - previous_timestamp)) 
    return result 

(rdd 
.map(lambda row: (row[1:], [row[0]])) # Data looks like ((customer, user, app), [timestamp]) 
.reduceByKey(lambda a, b: a + b) # Data looks like ((customer, user, app), list_of_timestamps) 
.flatMap(toTimeSince) # Data looks like (timestamp, customer, user, app, time_since_previous) 
.collect()) 

結果:

[(datetime.datetime(2017, 1, 1, 0, 1, 1), 'customer1', 'user2', 'app2', datetime.timedelta(30)), 
(datetime.datetime(2017, 1, 1, 0, 2, 1), 'customer1', 'user1', 'app2', datetime.timedelta(30)), 
(datetime.datetime(2017, 1, 1, 0, 0, 1), 'customer1', 'user1', 'app1', datetime.timedelta(30)), 
(datetime.datetime(2017, 1, 1, 0, 3, 1), 'customer1', 'user1', 'app1', datetime.timedelta(0, 180))] 

的關鍵點是:

  • 算法正如你描述它並非固有地適合於Spark--行之間存在很強的依賴性(每行必須通過與另一行進行比較來計算),這很難並行化。
  • 我的建議使用Spark爲同一客戶,用戶和應用的記錄彙總時間戳列表。在此之後,可以輕鬆地爲每個客戶 - 用戶 - 應用程序組合排序時間戳,並將其展開到所需的數據集中。
+0

很好的解決方案。 @timchap你可以請閱讀我的解決方案,並告訴我,如果這是正確的或不。 –

0

它可能你必須在pyspark和分區中使用窗口功能,因爲你的窗口將是用戶和應用程序。你必須給予排名然後如果排名是一個然後設置爲您的默認值否則當前時間 - 上一次。我想這就是你想要做的。

在SQL術語中,你必須使用分區的子句,但要在pyspark中使用它,你必須使用窗口。 希望這會解決你的問題有點懶惰寫代碼抱歉。

+0

我並不是要求任何人給我提供解決方案,但是代碼片段會非常有幫助。 –