2016-05-16 36 views
0

我試圖運行一個簡單的數據流Python管道,它從BigQuery獲取特定的用戶事件並生成每用戶事件計數。在谷歌雲數據流中使用CombinePerKey Python

p = df.Pipeline(argv=pipeline_args) 
result_query = "..." 
data = p | df.io.Read(df.io.BigQuerySource(query=result_query)) 
user_events = data|df.Map(lambda x: (x['users_user_id'], 1)) 
user_event_counts = user_events|df.CombinePerKey(sum) 

運行這給了我一個錯誤:CombinePerKey

TypeError: Expected tuple, got int [while running 'Map(<lambda at user_stats.py:...>)'] 

數據轉化爲這種形式:

(u'55107178236374', 1) 
(u'55107178236374', 1) 
(u'55107178236374', 1) 
(u'2296845644499670', 1) 
(u'2296845644499670', 1) 
(u'1489727796186326', 1) 
(u'1489727796186326', 1) 
(u'1489727796186326', 1) 
(u'1489727796186326', 1) 

相反,如果計算user_event_counts本:

user_event_counts = (user_events|df.GroupByKey()| 
    df.Map('count', lambda (user, ones): (user, sum(ones)))) 

然後沒有錯誤,我得到了我期望的結果。

基於docs我會期待這兩種方法的類似行爲。我明顯錯過了CombinePerKey,但我看不到它是什麼。任何提示讚賞!

回答

相關問題