2016-04-21 194 views
0

我從BQ下載一個表格到PySpark RDD中,如下所示。我如何再次上傳?將PySpark RDD上傳到BigQuery

dGSConfig = { 
    'project_id': "project_id", 
    'bucket': "bucket_id" 
} 
dBQConfig = { 
    'gs_config': dGSConfig, 
    'project_id': "project_id", 
    'dataset_id': "dataset_id", 
    'table_id': "table_id" 
} 

oSc = instantiate_pyspark() 
rddData, lsHeadings = get_table_cloud(oSc, dBQConfig) #rddData has a list-of-lists type format 




def instantiate_pyspark(): 
    """instantiate the pyspark RDD stuff""" 
    import pyspark 

    oSc = pyspark.SparkContext() 
    oHadoopConf = oSc._jsc.hadoopConfiguration() 
    oHadoopConf.get("fs.gs.system.bucket") 

    return oSc 


def get_table_cloud(oSc, dBQConfig): 
    """get a table from bigquery via google cloud storage 
    Config format: 
     dGSConfig = {'project_id': '', 'bucket': ''} 
     dBQConfig = {'project_id: '', 'dataset_id': '', 'table_id': ''} 
    """ 
    dGSConfig = dBQConfig['gs_config'] 

    dConf = { 
     "mapred.bq.project.id": dGSConfig['project_id'], 
     "mapred.bq.gcs.bucket": dGSConfig['bucket'], 
     "mapred.bq.input.project.id": dBQConfig['project_id'], 
     "mapred.bq.input.dataset.id":dBQConfig['dataset_id'], 
     "mapred.bq.input.table.id": dBQConfig['table_id'] 
    } 

    rddDatasetRaw = oSc.newAPIHadoopRDD(
     "com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat", 
     "org.apache.hadoop.io.LongWritable", 
     "com.google.gson.JsonObject", 
     conf=dConf 
    ) 

    import json 
    lsHeadings = json.loads(rddDatasetRaw.take(1)[0][1]).keys() 

    rddDataset = (
     rddDatasetRaw 
     .map(lambda t, json=json: json.loads(t[1]).values()) 
    ) 

    return rddDataset, lsHeadings 

回答

0

3種方法,我在某些時候使用:

1)創建本地CSV,上傳到Google存儲,單獨的進程中進入的BigQuery:

llData = rddData.collect() 


with open(sCsvPath, 'w') as f: 
    import csv 
    oWriter = csv.writer(f) 
    for lData in llData: 
     oWriter.writerow(lData) 

import subprocess 
lsCommand = ['gsutil', 'cp', sCsvPath, sGooglePath] 
subprocess.check_output(lsCommand) 

2)使用熊貓直接上傳到BigQuery:

import pandas as pd 
dfData = pd.DataFrame(llData, columns=lsHeadings) 

sProjectID = dBQConfig['sProjectID'] 
sTargetDataset = dBQConfig['sTargetDataset'] 
sTargetTable = dBQConfig['sTargetTable'] 

sTablePath = "{}.{}".format(sTargetDataset, sTargetTable) 
dfData.to_gbq(sTablePath, sProjectID, if_exists='replace') 

3)保存分配結果直接存儲使用pyspark:

#remove previous dir if exists 
import subprocess 
lsCommand = ['gsutil', 'rm', '-r', sGooglePath] 
subprocess.check_output(lsCommand) 

rddSave.saveAsTextFile(sGooglePath) 

雖然這些都不是我想要的東西原來,這是直接上傳到結果BQ的PySpark方式。