2016-10-26 54 views
3

所以我有一個用戶名的數據框他們已經發布的線程和這些帖子的時間戳。如果要弄清楚誰是線程的第一個用戶,以及什麼時間是什麼,我該怎麼辦。我知道要弄清楚第一篇文章是在線上做一個小組,然後在時間戳上做一個小時。但是,這將刪除用戶名。我如何使用該組並保留用戶名?使用group by時,在Spark中保留未使用的列?

+2

按用戶名稱分組 – eliasah

回答

1

您可以通過使用HiveContext和Hive的named_struct函數來執行此操作。訣竅是min將按照從左到右的順序評估列來處理結構,如果當前列相同,則只移動到下一列。因此,在這種情況下,它實際上只是比較時間戳列,但是通過在min函數吐出結果之後創建一個包含名稱的結構,您將可以訪問該結構。

data = [ 
    ('user', 'thread', 'ts'), 
    ('ryan', 1, 1234), 
    ('bob', 1, 2345), 
    ('bob', 2, 1234), 
    ('john', 2, 2223) 
] 

header = data[0] 
rdd = sc.parallelize(data[1:]) 
df = sqlContext.createDataFrame(rdd, header) 
df.registerTempTable('table') 

sql = """ 
SELECT thread, min(named_struct('ts', ts, 'user', user)) as earliest 
FROM table 
GROUP BY thread 
""" 

grouped = sqlContext.sql(sql) 
final = grouped.selectExpr('thread', 'earliest.user as user', 'earliest.ts as timestamp') 
1

這可以使用row_number()窗口函數來完成,這將使所有其他列保持不變。 使用withColumn創建一個類似「thread_user_order」的新列,其值應爲row_number()PARTITION BY線程ORDER BY ts。 然後過濾器「thread_user_order」== 1

下面是一些僞代碼:

df.withColumn("thread_user_order", row_number().over(Window.partitionBy(col("thread")).orderBy(col("ts")))).where(col("thread_user_order").equalTo(1)) 
1

您可以使用該結構的排序爲場通過順序和排序同時保留兩列。然後,當您撥打min時,它會先按時間戳排序,然後再按用戶名(如果/當兩次相連)進行排序。

user_time = functions.struct(df.timestamp, df.username).alias('user_time') 
min_thread_users_df = df.select(df.thread, user_time).groupby('thread').agg(
    functions.min('user_time').alias('user_time')).select(
    'thread', 'user_time.username', 'user_time.timestamp')