0

我想從谷歌的扳手數據庫中讀取表格,並將其寫入文本文件做一個備份,使用谷歌數據流與python sdk。 我寫了下面的腳本:從扳手讀取谷歌數據流

from __future__ import absolute_import 

import argparse 
import itertools 
import logging 
import re 
import time 
import datetime as dt 
import logging 

import apache_beam as beam 
from apache_beam.io import iobase 
from apache_beam.io import WriteToText 
from apache_beam.io.range_trackers import OffsetRangeTracker, UnsplittableRangeTracker 
from apache_beam.metrics import Metrics 
from apache_beam.options.pipeline_options import PipelineOptions 
from apache_beam.options.pipeline_options import StandardOptions, SetupOptions 
from apache_beam.options.pipeline_options import GoogleCloudOptions 

from google.cloud.spanner.client import Client 
from google.cloud.spanner.keyset import KeySet 

BUCKET_URL = 'gs://my_bucket' 
OUTPUT = '%s/output/' % BUCKET_URL 
PROJECT_ID = 'my_project' 
INSTANCE_ID = 'my_instance' 
DATABASE_ID = 'my_db' 
JOB_NAME = 'spanner-backup' 
TABLE = 'my_table' 


class SpannerSource(iobase.BoundedSource): 
    def __init__(self): 
    logging.info('Enter __init__') 

    self.spannerOptions = { 
     "id": PROJECT_ID, 
     "instance": INSTANCE_ID, 
     "database": DATABASE_ID 
    } 
    self.SpannerClient = Client 

    def estimate_size(self): 
    logging.info('Enter estimate_size') 
    return 1 

    def get_range_tracker(self, start_position=None, stop_position=None): 
    logging.info('Enter get_range_tracker') 
    if start_position is None: 
     start_position = 0 
    if stop_position is None: 
     stop_position = OffsetRangeTracker.OFFSET_INFINITY 

    range_tracker = OffsetRangeTracker(start_position, stop_position) 
    return UnsplittableRangeTracker(range_tracker) 

    def read(self, range_tracker): # This is not called when using the dataflowRunner ! 
    logging.info('Enter read') 
    # instantiate spanner client 
    spanner_client = self.SpannerClient(self.spannerOptions["id"]) 
    instance = spanner_client.instance(self.spannerOptions["instance"]) 
    database = instance.database(self.spannerOptions["database"]) 

    # read from table 
    table_fields = database.execute_sql("SELECT t.column_name FROM information_schema.columns AS t WHERE t.table_name = '%s'" % TABLE) 
    table_fields.consume_all() 
    self.columns = [x[0] for x in table_fields] 
    keyset = KeySet(all_=True) 
    results = database.read(table=TABLE, columns=self.columns, keyset=keyset) 

    # iterator over rows 
    results.consume_all() 
    for row in results: 
     JSON_row = { 
     self.columns[i]: row[i] for i in range(len(self.columns)) 
     } 
     yield JSON_row 

    def split(self, start_position=None, stop_position=None): 
    # this should not be called since the source is unspittable 
    logging.info('Enter split') 
    if start_position is None: 
     start_position = 0 
    if stop_position is None: 
     stop_position = 1 

    # Because the source is unsplittable (for now), only a single source is returned 
    yield iobase.SourceBundle(
     weight=1, 
     source=self, 
     start_position=start_position, 
     stop_position=stop_position) 


def run(argv=None): 
    """Main entry point""" 
    pipeline_options = PipelineOptions() 
    google_cloud_options = pipeline_options.view_as(GoogleCloudOptions) 
    google_cloud_options.project = PROJECT_ID 
    google_cloud_options.job_name = JOB_NAME 
    google_cloud_options.staging_location = '%s/staging' % BUCKET_URL 
    google_cloud_options.temp_location = '%s/tmp' % BUCKET_URL 

    #pipeline_options.view_as(StandardOptions).runner = 'DirectRunner' 
    pipeline_options.view_as(StandardOptions).runner = 'DataflowRunner' 
    p = beam.Pipeline(options=pipeline_options) 

    output = p | 'Get Rows from Spanner' >> beam.io.Read(SpannerSource()) 
    iso_datetime = dt.datetime.now().replace(microsecond=0).isoformat() 
    output | 'Store in GCS' >> WriteToText(file_path_prefix=OUTPUT + iso_datetime + '-' + TABLE, file_name_suffix='') # if this line is commented, job completes but does not do anything 


    result = p.run() 
    result.wait_until_finish() 


if __name__ == '__main__': 
    logging.getLogger().setLevel(logging.INFO) 
    run() 

然而,這個腳本正確運行只在DirectRunner:當我讓它在DataflowRunner運行,它與錯誤而退出之前運行了一段時間,沒有任何輸出:

"Executing failure step failure14 [...] Workflow failed. Causes: [...] The worker lost contact with the service."

有時候,它會一直持續下去,不會創建輸出。此外,如果我註釋行'output = ...',作業就完成了,但是沒有實際讀取數據。

它似乎也是dataflowRunner調用函數的'estimate_size'來源,但不是函數'read'或'get_range_tracker'。

有沒有人有什麼想法可能會導致此? 我知道有一個(更完整)的java SDK,其中有一個實驗性的扳手源/匯,但如果可能的話,我寧願用python。

感謝

+0

我們已經優先的Java數據流接口第一,我建議你使用ParDos使用Java或實現的Python連接器,見https://beam.apache.org/documentation/sdks/python-custom-io/ –

+0

謝謝@MairbekKhadikov。我會暫時嘗試ParDo的方式。 –

回答

0

我已經返工我的代碼的建議,簡單地使用帕爾多之後,而不是使用BoundedSource類。作爲參考,這裏是我的解決方案;我相信有很多方法可以改進,我很樂意聽取意見。 特別是我很驚訝,我有一個啓動管道時,(如果我不這樣做,我得到一個錯誤

AttributeError: 'PBegin' object has no attribute 'windowing'

,我不能變通解決創建一個虛擬PColl,假人PColl感覺有點就像一個黑客。

from __future__ import absolute_import 

import datetime as dt 
import logging 

import apache_beam as beam 
from apache_beam.io import WriteToText 
from apache_beam.options.pipeline_options import PipelineOptions 
from apache_beam.options.pipeline_options import StandardOptions, SetupOptions 
from apache_beam.options.pipeline_options import GoogleCloudOptions 
from google.cloud.spanner.client import Client 
from google.cloud.spanner.keyset import KeySet 

BUCKET_URL = 'gs://my_bucket' 
OUTPUT = '%s/some_folder/' % BUCKET_URL 
PROJECT_ID = 'my_project' 
INSTANCE_ID = 'my_instance' 
DATABASE_ID = 'my_database' 
JOB_NAME = 'my_jobname' 

class ReadTables(beam.DoFn): 
    def __init__(self, project, instance, database): 
     super(ReadTables, self).__init__() 
     self._project = project 
     self._instance = instance 
     self._database = database 

    def process(self, element): 
     # get list of tables in the database 
     table_names_row = Client(self._project).instance(self._instance).database(self._database).execute_sql('SELECT t.table_name FROM information_schema.tables AS t') 
     for row in table_names_row: 
      if row[0] in [u'COLUMNS', u'INDEXES', u'INDEX_COLUMNS', u'SCHEMATA', u'TABLES']: # skip these 
       continue 
      yield row[0] 

class ReadSpannerTable(beam.DoFn): 
    def __init__(self, project, instance, database): 
     super(ReadSpannerTable, self).__init__() 
     self._project = project 
     self._instance = instance 
     self._database = database 

    def process(self, element): 
     # first read the columns present in the table 
     table_fields = Client(self._project).instance(self._instance).database(self._database).execute_sql("SELECT t.column_name FROM information_schema.columns AS t WHERE t.table_name = '%s'" % element) 
     columns = [x[0] for x in table_fields] 

     # next, read the actual data in the table 
     keyset = KeySet(all_=True) 
     results_streamed_set = Client(self._project).instance(self._instance).database(self._database).read(table=element, columns=columns, keyset=keyset) 

     for row in results_streamed_set: 
      JSON_row = { columns[i]: row[i] for i in xrange(len(columns)) } 
      yield (element, JSON_row)   # output pairs of (table_name, data) 

def run(argv=None): 
    """Main entry point""" 
    pipeline_options = PipelineOptions() 
    pipeline_options.view_as(SetupOptions).save_main_session = True 
    pipeline_options.view_as(SetupOptions).requirements_file = "requirements.txt" 
    google_cloud_options = pipeline_options.view_as(GoogleCloudOptions) 
    google_cloud_options.project = PROJECT 
    google_cloud_options.job_name = JOB_NAME 
    google_cloud_options.staging_location = '%s/staging' % BUCKET_URL 
    google_cloud_options.temp_location = '%s/tmp' % BUCKET_URL 

    pipeline_options.view_as(StandardOptions).runner = 'DataflowRunner' 
    p = beam.Pipeline(options=pipeline_options) 

    init = p  | 'Begin pipeline'    >> beam.Create(["test"])             # have to create a dummy transform to initialize the pipeline, surely there is a better way ? 
    tables = init | 'Get tables from Spanner'  >> beam.ParDo(ReadTables(PROJECT, INSTANCE_ID, DATABASE_ID))   # read the tables in the db 
    rows = (tables | 'Get rows from Spanner table' >> beam.ParDo(ReadSpannerTable(PROJECT, INSTANCE_ID, DATABASE_ID)) # for each table, read the entries 
        | 'Group by table'    >> beam.GroupByKey() 
        | 'Formatting'     >> beam.Map(lambda (table_name, rows): (table_name, list(rows))))  # have to force to list here (dataflowRunner produces _Unwindowedvalues) 

    iso_datetime = dt.datetime.now().replace(microsecond=0).isoformat() 
    rows    | 'Store in GCS'    >> WriteToText(file_path_prefix=OUTPUT + iso_datetime, file_name_suffix='') 

    result = p.run() 
    result.wait_until_finish() 

if __name__ == '__main__': 
    logging.getLogger().setLevel(logging.INFO) 
    run()