2017-07-06 59 views
0

我試圖加載CSV具有自動檢測模式下的文件,但我無法將文件加載到大查詢檢測模式。誰可以幫我這個事。負載csv文件到大的查詢自動使用Python API

請在下面找到我的代碼:

def load_data_from_file(dataset_name, table_name, source_file_name): 

    bigquery_client = bigquery.Client() 
    dataset = bigquery_client.dataset(dataset_name) 
    table = dataset.table(table_name)  
    table.reload() 
    with open(source_file_name, 'rb') as source_file:   
     job = table.upload_from_file(
      source_file, source_format='text/csv') 
    wait_for_job(job) 
    print('Loaded {} rows into {}:{}.'.format(
     job.output_rows, dataset_name, table_name)) 
def wait_for_job(job): 
    while True: 
     job.reload() 
     if job.state == 'DONE': 
      if job.error_result: 
       raise RuntimeError(job.errors) 
      return 
    time.sleep(1) 
+0

您是否收到錯誤?如果是這樣,那是什麼? –

回答

0

目前,Python客戶端具有加載數據文件從一個模式自動檢測標誌(我打算做一個拉請求添加該支持但我仍想傾訴的維護者什麼他們的意見不支持在這個實現上)。

還有工作這周圍的一些方法。我沒有找到一個非常完美的解決方案,但迄今爲止不過這個代碼允許你添加模式檢測輸入標誌:

from google.cloud.bigquery import Client 
import os 
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'path/your/json.key' 
import google.cloud.bigquery.table as mtable 

def _configure_job_metadata(metadata, 
          allow_jagged_rows, 
          allow_quoted_newlines, 
          create_disposition, 
          encoding, 
          field_delimiter, 
          ignore_unknown_values, 
          max_bad_records, 
          quote_character, 
          skip_leading_rows, 
          write_disposition): 
    load_config = metadata['configuration']['load'] 

    if allow_jagged_rows is not None: 
     load_config['allowJaggedRows'] = allow_jagged_rows 

    if allow_quoted_newlines is not None: 
     load_config['allowQuotedNewlines'] = allow_quoted_newlines 

    if create_disposition is not None: 
     load_config['createDisposition'] = create_disposition 

    if encoding is not None: 
     load_config['encoding'] = encoding 

    if field_delimiter is not None: 
     load_config['fieldDelimiter'] = field_delimiter 

    if ignore_unknown_values is not None: 
     load_config['ignoreUnknownValues'] = ignore_unknown_values 

    if max_bad_records is not None: 
     load_config['maxBadRecords'] = max_bad_records 

    if quote_character is not None: 
     load_config['quote'] = quote_character 

    if skip_leading_rows is not None: 
     load_config['skipLeadingRows'] = skip_leading_rows 

    if write_disposition is not None: 
     load_config['writeDisposition'] = write_disposition 
    load_config['autodetect'] = True # --> Here you can add the option for schema auto-detection 

mtable._configure_job_metadata = _configure_job_metadata 

bq_client = Client() 
ds = bq_client.dataset('dataset_name') 
ds.table = lambda: mtable.Table('table_name', ds) 
table = ds.table() 

with open(source_file_name, 'rb') as source_file:   
    job = table.upload_from_file(
     source_file, source_format='text/csv') 
0

只是想證明我是如何使用Python客戶端。

下面是我的函數創建一個表,並與csv文件加載它。

此外,self.client是我bigquery.Client()

def insertTable(self, datasetName, tableName, csvFilePath, schema=None): 
    """ 
    This function creates a table in given dataset in our default project 
    and inserts the data given via a csv file. 

    :param datasetName: The name of the dataset to be created 
    :param tableName: The name of the dataset in which the table needs to be created 
    :param csvFilePath: The path of the file to be inserted 
    :param schema: The schema of the table to be created 
    :return: returns nothing 
    """ 
    csv_file = open(csvFilePath, 'rb') 

    dataset_ref = self.client.dataset(datasetName) 
    dataset = Dataset(dataset_ref) 

    table_ref = dataset.table(tableName) 
    if schema is not None: 
     table = bigquery.Table(table_ref,schema) 
    else: 
     table = bigquery.Table(table_ref) 

    try: 
     self.client.delete_table(table) 
    except: 
     pass 

    table = self.client.create_table(table) 
    job_config = LoadJobConfig() 
    table_ref = dataset.table(tableName) 
    job_config.source_format = 'CSV' 
    job_config.skip_leading_rows = 1 
    job_config.autodetect = True 
    job = self.client.load_table_from_file(
     csv_file, table_ref, job_config=job_config) 
    job.result() 

讓我知道如果這能解決你的問題。

+0

定義LoadJobConfig函數/方法在哪裏? –