問題:和批量處理存儲元素
我有我的數據存儲項目的2M +用戶數據列表。我想向所有用戶發送每週通訊。郵件API接受每個API調用最多50個電子郵件地址。
先前的解決方案:
二手應用程序引擎的後端和簡單的數據存儲區查詢一氣呵成處理的所有記錄。但是會發生什麼呢,有時候我會得到內存溢出嚴重錯誤日誌,並且這個過程又重新開始。由於這一些用戶,不止一次收到相同的電子郵件。所以我轉向了數據流。
目前的解決方案:
我使用FlatMap功能,每封電子郵件ID發送給一個函數,然後逐一發送電子郵件給每個用戶。
def process_datastore(project, pipeline_options):
p = beam.Pipeline(options=pipeline_options)
query = make_query()
entities = (p | 'read from datastore' >> ReadFromDatastore(project, query))
entities | beam.FlatMap(lambda entity: sendMail([entity.properties.get('emailID', "")]))
return p.run()
通過雲數據流,我確保每個用戶只收到一次郵件,而且沒有人遺漏。沒有內存錯誤。
但是這個當前過程需要7個小時才能完成運行。我試圖用ParDo替換FlatMap,並假定ParDo將並行化該過程。但即便如此,也需要同一時間。
問:
如何在一堆50組的電子郵件ID,從而有效地使用郵件API調用?
如何並行化過程,使所需時間少於一個小時?
您的管道可能遭受此處所述的其中一種情況:https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py#L58 。在這種情況下,你需要通過分解融合https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion – jkff
來引入更多的並行性。另外:FlatMap和ParDo是等價的 - 它們都是並行的,但都受到融合(見上文)。一般來說,當調試作業的性能時,請包含一個oncall工程師可以查看的數據流作業ID。 – jkff