2017-09-13 34 views
0

問題:和批量處理存儲元素

我有我的數據存儲項目的2M +用戶數據列表。我想向所有用戶發送每週通訊。郵件API接受每個API調用最多50個電子郵件地址。

先前的解決方案:

二手應用程序引擎的後端和簡單的數據存儲區查詢一氣呵成處理的所有記錄。但是會發生什麼呢,有時候我會得到內存溢出嚴重錯誤日誌,並且這個過程又重新開始。由於這一些用戶,不止一次收到相同的電子郵件。所以我轉向了數據流。

目前的解決方案:

我使用FlatMap功能,每封電子郵件ID發送給一個函數,然後逐一發送電子郵件給每個用戶。

def process_datastore(project, pipeline_options): 
    p = beam.Pipeline(options=pipeline_options) 
    query = make_query() 
    entities = (p | 'read from datastore' >> ReadFromDatastore(project, query)) 
    entities | beam.FlatMap(lambda entity: sendMail([entity.properties.get('emailID', "")])) 
    return p.run() 

通過雲數據流,我確保每個用戶只收到一次郵件,而且沒有人遺漏。沒有內存錯誤。

但是這個當前過程需要7個小時才能完成運行。我試圖用ParDo替換FlatMap,並假定ParDo將並行化該過程。但即便如此,也需要同一時間。

問:

  1. 如何在一堆50組的電子郵件ID,從而有效地使用郵件API調用?

  2. 如何並行化過程,使所需時間少於一個小時?

+0

您的管道可能遭受此處所述的其中一種情況:https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py#L58 。在這種情況下,你需要通過分解融合https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion – jkff

+0

來引入更多的並行性。另外:FlatMap和ParDo是等價的 - 它們都是並行的,但都受到融合(見上文)。一般來說,當調試作業的性能時,請包含一個oncall工程師可以查看的數據流作業ID。 – jkff

回答

0

你可以使用查詢光標在50批次的用戶分開,做俯臥撐隊列或deferred任務內的實際批量處理(電子郵件發送)。這將是一個僅GAE的解決方案,沒有云數據流,恕我直言很簡單。

您可以在Google appengine: Task queue performance中找到此類處理的示例(同時考慮到該答案)。該解決方案使用deferred庫,但使用推送隊列任務幾乎是微不足道的。

答案涉及並行性方面,您可能想限制它以降低成本。

您也可以將批處理本身分配到任務中,以獲得無限可擴展的解決方案(任意數量的接收者,不會超過內存或超時失敗的時間),任務重新排隊以繼續其離開的地方關閉。