2017-10-11 23 views
0

我想修改BigQuery中的單個列,並將更新的數據寫入新表,而無需手動保留所有其他列。我能做到,我想用下面的代碼做什麼:修改單個BigQuery列並寫入新表

row = p | 'ReadFromBigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=query))  
new_row = row | beam.Map(lambda x: (x["col1"], x["col2"], preprocess(x["text_col"])) 
output = new_row | beam.Map(lambda (col1, col2, processed_text): {"col1": col1, "col2": col2, "text": processed_text} 

output | beam.io.WriteToBigQuery(path_to_new_table) 

然而,這需要我基本上寫,用手保持每列 - 如果我有100多個列(或真的連10+列)這會變得非常混亂和繁瑣得非常快。是否有更簡單的方法在一行上運行某個函數(本例中爲preprocess())並更新該列並仍保留其他列?

+0

提示:嘗試代表您的數據要素元組,但作爲字典。然後,您可以使用.copy()複製字典,然後根據需要修改副本,然後返回修改過的字典。 – jkff

+0

啊,所以這將是一個單獨的函數,接受一個字典,然後修改可以發生在那裏 – reese0106

+0

@jkff檢查我的答案,並讓我知道如果這是你的意圖:) – reese0106

回答

1

感謝@jkff,我想出瞭如何做到這一點。函數應該接受和接受一個字典,然後你可以修改字典的單個元素。喜歡的東西:

new_row = row | beam.Map(lambda x: preprocess_text(x, col_to_transform='text_column')` 

凡preprocess_text()會是這樣的:

def preprocess_text(row, col_to_transform): 
    row_copy = row.copy() 
    line = row_copy[col_to_transform] 
    line = ... # preprocessing transform goes here 
    row_copy[col_to_transform] = line 

    return row_copy 
+0

您需要複製一份而不是修改原始對象。修改傳遞給變換的當前元素是不安全的。 – jkff

+0

@jkff感謝您的建議。在預處理功能中完成複製是否安全?或者你會建議把它放在別的地方嗎? – reese0106

+0

我想不到一個不同的地方去做。傳遞給beam.Map,beam.FlatMap,beam.ParDo,beam.Filter等的函數都被禁止修改它們的輸入元素(或者,就此而言,修改它們已經輸出的元素) - 基本上,處理元素的PCollections作爲不可變的。所以 - 如果你想修改它,你必須在預處理函數中創建副本。 – jkff