2017-08-25 95 views
2

給定一個相對較小的數據源(3,000-10,000)的鍵/值對,我試圖只處理符合組閾值(50-100)的記錄。所以最簡單的方法是將它們按鍵,過濾和展開進行分組 - 無論是使用FlatMap還是ParDo。迄今爲止,最大的團體只有1,500條記錄。但這似乎是Google Cloud Dataflow生產中的一個嚴重瓶頸。爲什麼Apache Beam中的GroupByKey之後的FlatMap如此之慢?

隨着給定的列表

(1,1) (1,2) (1,3) ... (2,1) (2,2) (2,3) ...

通過鍵變換集過濾和組的運行:

p | 'Group' >> beam.GroupByKey() 
    | 'Filter' >> beam.Filter(lambda (key, values): len(list(values)) > 50) 
    | 'Unwind' >> beam.FlatMap(lambda (key, values): values) 

任何想法如何使這更好的性能?謝謝你的幫助!

+0

請與調查結果報告!如果答案有用,請選擇它。 – Pablo

回答

2

這是一個有趣的管道案例。我相信你的問題在於你讀取來自GroupByKey的數據。讓我簡單介紹一下GBK的工作原理。

什麼GroupByKey,以及如何大數據系統實現它

所有的大數據系統實現方式實現在同一個密鑰的多個元素的操作。這在MapReduce中被稱爲減少,在其他大數據系統中被稱爲Group By Key或Combine。

當您執行GroupByKey轉換時,Dataflow需要將單個密鑰的所有元素收集到同一臺機器中。由於同一個密鑰的不同元素可能在不同的機器上處理,所以數據需要以某種方式序列化。

這意味着當您讀取來自GroupByKey的數據時,您正在訪問工作人員的IO(即不是來自內存),所以您確實要避免多次讀取shuffle數據。

這如何轉化爲您的管道

我相信,在這裏你的問題是,FilterUnwind都將分別讀取洗牌的數據(所以你會讀出每個列表數據兩次)。你想要做的只是讀你的洗牌數據一次。您可以使用單一的FlatMap來完成此操作,該功能既可以在不洗牌的情況下進行雙重讀取,也可以對數據進行過濾和展開。就像這樣:

def unwind_and_filter((key, values)): 
    # This consumes all the data from shuffle 
    value_list = list(values) 
    if len(value_list) > 50: 
    yield value_list 

p | 'Group' >> beam.GroupByKey() 
    | 'UnwindAndFilter' >> beam.FlatMap(unwind_and_filter) 

讓我知道這是否有幫助。

+0

感謝您的迴應!儘管嘗試了不同的變化,但它沒有效果。問題不在於它分組的速度有多快,而是在FlatMap階段中從組發出的展開元素的速度有多快 - 每秒鐘兩到三次,而所有其他轉換幾乎立即進行。而且我知道它不可並行,因爲它必須位於同一臺機器上。 –

+0

平面圖之後你有什麼變革?你有工作ID嗎? – Pablo

+0

確定它是一個爲每個驗證點添加隨機記錄的ParDo,然後是爲遠程查詢批量隨機記錄的另一個記錄。作業ID是:2017-08-26_06_10_38-4021415491644394676。 –

相關問題