0
我試圖運行一個簡單的數據流Python管道,它從BigQuery獲取特定的用戶事件並生成每用戶事件計數。在谷歌雲數據流中使用CombinePerKey Python
p = df.Pipeline(argv=pipeline_args)
result_query = "..."
data = p | df.io.Read(df.io.BigQuerySource(query=result_query))
user_events = data|df.Map(lambda x: (x['users_user_id'], 1))
user_event_counts = user_events|df.CombinePerKey(sum)
運行這給了我一個錯誤:CombinePerKey
前
TypeError: Expected tuple, got int [while running 'Map(<lambda at user_stats.py:...>)']
數據轉化爲這種形式:
(u'55107178236374', 1)
(u'55107178236374', 1)
(u'55107178236374', 1)
(u'2296845644499670', 1)
(u'2296845644499670', 1)
(u'1489727796186326', 1)
(u'1489727796186326', 1)
(u'1489727796186326', 1)
(u'1489727796186326', 1)
相反,如果計算user_event_counts
本:
user_event_counts = (user_events|df.GroupByKey()|
df.Map('count', lambda (user, ones): (user, sum(ones))))
然後沒有錯誤,我得到了我期望的結果。
基於docs我會期待這兩種方法的類似行爲。我明顯錯過了CombinePerKey
,但我看不到它是什麼。任何提示讚賞!