2017-10-05 59 views
1

我寫了一個Dataflow作業,當我手動運行它時效果很好。下面是相關部分(爲清楚起見移除了一些驗證碼):數據流模板中的動態bigquery查詢

parser.add_argument('--end_datetime', 
        dest='end_datetime') 
known_args, pipeline_args = parser.parse_known_args(argv) 

query = <redacted SQL String with a placeholder for a date> 
query = query.replace('#ENDDATETIME#', known_args.end_datetime) 

with beam.Pipeline(options=pipeline_options) as p: 
    rows = p | 'read query' >> beam.io.Read(beam.io.BigQuerySource(query=query, use_standard_sql=True)) 

現在我想創建一個模板,並安排其與動態ENDDATETIME定期運行。據我瞭解,爲了做到這一點,我需要改變add_argument按照本文檔add_value_provider_argument:

https://cloud.google.com/dataflow/docs/templates/creating-templates

遺憾的是,似乎ValueProvider值不可用,當我需要他們,他們是唯一可用在管道內部。 (如果我在這裏錯了,請糾正我......)。所以我有點卡住了。

有沒有人有關於如何在數據流模板中獲取動態日期到我的查詢中的任何指針?

回答

4

Python目前僅支持FileBasedSource IO的ValueProvider選項。您可以通過單擊您使用的鏈接上的Python選項卡來看到: https://cloud.google.com/dataflow/docs/templates/creating-templates

在「流水線I/O和運行時參數」部分下。

+0

是的,但ValueProvider可以在其他非IO方法中使用。例如,該頁面上的「在您的函數中使用ValueProvider」示例在ParDo中使用ValueProvider值。所以解決這個問題的一種方法是修改DoFn中的查詢變量,但是我沒有看到一種方式將修改後的變量返回到主程序,以便可以在隨後的管道步驟中使用。 –

+0

與Java中發生的不同,Python中的BigQuery不使用自定義源。換句話說,它沒有在SDK中完全實現,但是也包含了後端的部分。目前,只有本地(不是自定義源)可以使用模板。有計劃將BigQuery添加爲自定義。 –

+0

我的意思是隻有自定義源可以使用模板。 (其餘是正確的)。我的意思是指定「本地」與「自定義」相反,所以本機來源不能使用模板(Python BigQuery的當前案例)。以下是將BigQuery作爲自定義源添加的計劃:https://issues.apache.org/jira/browse/BEAM-1440 –