我想讀取一個csv文件並使用apache beam數據流將其寫入BigQuery。爲了做到這一點,我需要以字典的形式將數據呈現給BigQuery。我如何使用apache beam來轉換數據以執行此操作?如何將csv轉換爲apache beam數據流中的字典
我的輸入csv文件有兩列,我想在BigQuery中創建一個後續的兩列表。我知道如何在BigQuery中創建數據,這是直接的,我不知道如何將csv轉換爲字典。下面的代碼是不正確的,但應該告訴我想要做什麼。
# Standard imports
import apache_beam as beam
# Create a pipeline executing on a direct runner (local, non-cloud).
p = beam.Pipeline('DirectPipelineRunner')
# Create a PCollection with names and write it to a file.
(p
| 'read solar data' >> beam.Read(beam.io.TextFileSource('./sensor1_121116.csv'))
# How do you do this??
| 'convert to dictionary' >> beam.Map(lambda (k, v): {'luminosity': k, 'datetime': v})
| 'save' >> beam.Write(
beam.io.BigQuerySink(
output_table,
schema='month:INTEGER, tornado_count:INTEGER',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run()
非常感謝巴勃羅,這個作品非常好!這裏是一個代碼片段,以防人們在尋找完整性(p |'read solar data'>> beam.Read(CsvFileSource('./ sensor1_121116.csv')) |'save'>> beam.Write(beam .io.TextFileSink('./ greetings_solar'))) – user1753640
我正在嘗試將結果寫入BigQuery,但沒有運氣,表格被創建但沒有數據。你能告訴發生了什麼事嗎?這裏是一個片段(p |'read solar data'>> beam.Read(CsvFileSource('./ sensor1_121116.csv')) |'save'>> beam.Write( beam.io.BigQuerySink( output_table , 模式= 'lumosity:INTEGER,時間:INTEGER', create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE))) – user1753640
@ user1753640:我有同樣的問題,不得不在將數據存儲到GBQ之前,使用匹配模式的字典。 – vdolez