我從BQ下載一個表格到PySpark RDD中,如下所示。我如何再次上傳?將PySpark RDD上傳到BigQuery
dGSConfig = {
'project_id': "project_id",
'bucket': "bucket_id"
}
dBQConfig = {
'gs_config': dGSConfig,
'project_id': "project_id",
'dataset_id': "dataset_id",
'table_id': "table_id"
}
oSc = instantiate_pyspark()
rddData, lsHeadings = get_table_cloud(oSc, dBQConfig) #rddData has a list-of-lists type format
def instantiate_pyspark():
"""instantiate the pyspark RDD stuff"""
import pyspark
oSc = pyspark.SparkContext()
oHadoopConf = oSc._jsc.hadoopConfiguration()
oHadoopConf.get("fs.gs.system.bucket")
return oSc
def get_table_cloud(oSc, dBQConfig):
"""get a table from bigquery via google cloud storage
Config format:
dGSConfig = {'project_id': '', 'bucket': ''}
dBQConfig = {'project_id: '', 'dataset_id': '', 'table_id': ''}
"""
dGSConfig = dBQConfig['gs_config']
dConf = {
"mapred.bq.project.id": dGSConfig['project_id'],
"mapred.bq.gcs.bucket": dGSConfig['bucket'],
"mapred.bq.input.project.id": dBQConfig['project_id'],
"mapred.bq.input.dataset.id":dBQConfig['dataset_id'],
"mapred.bq.input.table.id": dBQConfig['table_id']
}
rddDatasetRaw = oSc.newAPIHadoopRDD(
"com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat",
"org.apache.hadoop.io.LongWritable",
"com.google.gson.JsonObject",
conf=dConf
)
import json
lsHeadings = json.loads(rddDatasetRaw.take(1)[0][1]).keys()
rddDataset = (
rddDatasetRaw
.map(lambda t, json=json: json.loads(t[1]).values())
)
return rddDataset, lsHeadings