2016-09-24 152 views
3

我想從ParDo函數中調用beam.io.Write(beam.io.BigQuerySink(..))操作,爲PCollection(即使用python SDK)中的每個鍵生成單獨的BigQuery表。這裏有兩個類似線程,不幸的是沒有幫助:從ParDo函數中寫入BigQuery

1)https://stackoverflow.com/questions/31156774/about-key-grouping-with-groupbykey

2)Dynamic table name when writing to BQ from dataflow pipelines

當我執行下面的代碼中,行的第一鑰匙插入到了BigQuery中,然後管道失敗並顯示以下錯誤。真的很感謝任何關於我在做什麼錯誤的建議或對如何解決它的任何建議。

管道代碼:

rows = p | 'read_bq_table' >> beam.io.Read(beam.io.BigQuerySource(query=query)) 

class par_upload(beam.DoFn): 

    def process(self, context): 
     key, value = context.element 

     ### This block causes issues ### 
     value | 'write_to_bq' >> beam.io.Write(
         beam.io.BigQuerySink(
          'PROJECT-NAME:analytics.first_table', #will be replace by a dynamic name based on key 
          schema=schema, 
          write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, 
          create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED 
          ) 
      ) 
     ### End block ###### 
     return [value] 


### Following part works fine ### 
filtered = (rows | 'filter_rows' >> beam.Filter(lambda row: row['topic'] == 'analytics') 
        | 'apply_projection' >> beam.Map(apply_projection, projection_fields) 
        | 'group_by_key' >> beam.GroupByKey() 
        | 'par_upload_to_bigquery' >> beam.ParDo(par_upload()) 
        | 'flat_map' >> beam.FlatMap(lambda l: l) #this step is just for testing 
       ) 

### This part works fine if I comment out the 'write_to_bq' block above 
filtered | 'write_to_bq' >> beam.io.Write(
     beam.io.BigQuerySink(
      'PROJECT-NAME:analytics.another_table', 
      schema=schema, 
      write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE, 
      create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED) 
     ) 

錯誤消息:

INFO:oauth2client.client:Attempting refresh to obtain initial access_token 
INFO:oauth2client.client:Attempting refresh to obtain initial access_token 
INFO:root:Writing 1 rows to PROJECT-NAME:analytics.first_table table. 
INFO:root:Final: Debug counters: {'element_counts': Counter({'CreatePInput0': 1, 'write_to_bq/native_write': 1})} 
ERROR:root:Error while visiting par_upload_to_bigquery 
Traceback (most recent call last): 
    File "split_events.py", line 137, in <module> 
    run() 
    File "split_events.py", line 132, in run 
    p.run() 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 159, in run 
    return self.runner.run(self) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 102, in run 
    super(DirectPipelineRunner, self).run(pipeline) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 98, in run 
    pipeline.visit(RunVisitor(self)) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 182, in visit 
    self._root_transform().visit(visitor, self, visited) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 419, in visit 
    part.visit(visitor, pipeline, visited) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/pipeline.py", line 422, in visit 
    visitor.visit_transform(self) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 93, in visit_transform 
    self.runner.run_transform(transform_node) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 168, in run_transform 
    return m(transform_node) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 98, in func_wrapper 
    func(self, pvalue, *args, **kwargs) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 180, in run_ParDo 
    runner.process(v) 
    File "apache_beam/runners/common.py", line 133, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:4483) 
    File "apache_beam/runners/common.py", line 139, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:4311) 
    File "apache_beam/runners/common.py", line 150, in apache_beam.runners.common.DoFnRunner.reraise_augmented (apache_beam/runners/common.c:4677) 
    File "apache_beam/runners/common.py", line 137, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:4245) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/typehints/typecheck.py", line 149, in process 
    return self.run(self.dofn.process, context, args, kwargs) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/typehints/typecheck.py", line 134, in run 
    result = method(context, *args, **kwargs) 
    File "split_events.py", line 73, in process 
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 724, in __ror__ 
    return self.transform.__ror__(pvalueish, self.label) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 445, in __ror__ 
    return _MaterializePValues(cache).visit(result) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 105, in visit 
    return self._pvalue_cache.get_unwindowed_pvalue(node) 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 262, in get_unwindowed_pvalue 
    return [v.value for v in self.get_pvalue(pvalue)] 
    File "/Users/dimitri/anaconda/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 244, in get_pvalue 
    value_with_refcount = self._cache[self.key(pvalue)] 
KeyError: "(4384177040, None) [while running 'par_upload_to_bigquery']" 

編輯(第一個答案後):

我沒有意識到我的需要是PCollection

我已經改變了我的代碼,這個現在(這可能是非常低效):

key_pipe = p | 'pipe_' + key >> beam.Create(value) 
key_pipe | 'write_' + key >> beam.io.Write(beam.io.BigQuerySink(..)) 

現在工作得很好本地但與BlockingDataflowPipelineRunner :-(

管道失敗出現以下錯誤:

JOB_MESSAGE_ERROR: (979394c29490e588): Traceback (most recent call last): 
    File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 474, in do_work 
    work_executor.execute() 
    File "dataflow_worker/executor.py", line 901, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:24331) 
    op.start() 
    File "dataflow_worker/executor.py", line 465, in dataflow_worker.executor.DoOperation.start (dataflow_worker/executor.c:14193) 
    def start(self): 
    File "dataflow_worker/executor.py", line 469, in dataflow_worker.executor.DoOperation.start (dataflow_worker/executor.c:13499) 
    fn, args, kwargs, tags_and_types, window_fn = (
ValueError: too many values to unpack (expected 5) 
+0

我認爲第一個答案仍然正確 - 您不能在作爲該管道一部分運行的「DoFn」中向管道添加更多步驟。這在DirectRunner中工作的事實是一個錯誤。由於其他線程表明,如果您想要執行這種與數據相關的寫操作,您需要直接與BigQuery API交互,而不是現在使用BigQuerySink。 –

回答

0

在類似線程中,唯一建議做BigQuery寫操作在ParDo中是直接使用BigQuery API,或使用client

您編寫的代碼是將Dataflow ParDo classbeam.io.BigQuerySink()放入DoFn函數中。 ParDo類預計在工作代碼示例中使用PCollection,如filtered。對於在value上工作的非功能代碼,情況並非如此。

我認爲最簡單的選擇是看看gcloud-python BigQuery函數insert_data()並將其放入ParDo中。