0

我試圖構建一個ETL來加載一個Dimension表。我使用Python和DataFlow和BigQuery來分配Apache Bea。Apache-Beam將序列號添加到PCollection中

我需要爲pcollection的每個元素分配一個序列號,以便將其加載到BigQuery中,但我找不到任何方法來執行此操作。

我想我需要DataFlow使先前的聚合和連接,以獲得我最後的pcollection添加序列號,但在這一刻我需要停止並行處理,並將我的pcollection投到列表(如在Spark中,當你使用.collect()),然後製作一個簡單的循環來分配序列號。這樣對嗎?

這是管道,我編碼:

p | ReadFromAvro(known_args.input) | beam.Map(adapt) | beam.GroupByKey() | beam.Map(adaptGroupBy) 

我讀過沒有辦法擺脫pcollection列表: How to get a list of elements out of a PCollection in Google Dataflow and use it in the pipeline to loop Write Transforms?

我怎樣才能實現呢?任何幫助?

+0

你能後你有什麼到目前爲止已經試過,和代碼? –

+0

這是我使用Beam的第一種方法。我會添加我的一段代碼,但我沒有找到任何方法。 –

+0

你能詳細說明爲什麼你認爲你需要添加序號嗎?那麼你計劃在BigQuery中做什麼,這將需要這個序列號? –

回答

1

如果您想要的是獲取PCollection中每個元素的列表,則可以使用側面輸入。請記住,這將從您的結果中刪除所有並行性,並且您的管道可能會變慢。

如果您仍然想這樣做的話:

side_input_coll = beam.pvalue.AsIterable(my_collection) 

(p 
| beam.Create([0]) 
| beam.FlatMap(lambda _, my_seq: [(elem, i) for i, elem in enumerate(my_seq)], 
       my_seq=side_input_coll)) 

但是不要忘記,爲了維護並行性,它可能是最好簡單地生成一個隨機ID。請記住PCollections本質上是無序的。

要了解更多關於側面輸入,看到Beam Programming Guide on Side Inputs