1
更新:我會馬上回答這個問題我自己(此代碼的工作):JSON上傳至BigQuery
基於關我的自定義上傳代碼:https://developers.google.com/bigquery/loading-data-into-bigquery#loaddatapostrequest
import sys
import json
from apiclient.discovery import build
from oauth2client.file import Storage
from oauth2client.client import AccessTokenRefreshError
from oauth2client.client import OAuth2WebServerFlow
from oauth2client.tools import run
from apiclient.errors import HttpError
import httplib2
FLOW = OAuth2WebServerFlow(
client_id='xxxxxxx.apps.googleusercontent.com',
client_secret='shhhhhhhhhhhh',
scope='https://www.googleapis.com/auth/bigquery',
user_agent='my-program-name/1.0')
def loadTable(http, service):
projectId = 'drc-compute'
datasetId = 'standing'
import time
tableId = 'test_' + str(int(time.time()))
url = "https://www.googleapis.com/upload/bigquery/v2/projects/" + projectId + "/jobs"
schema = open('test_schema.json', 'r')
# Create the body of the request, separated by a boundary of xxx
newresource = ('--xxx\n' +
'Content-Type: application/json; charset=UTF-8\n' + '\n' +
'{\n' +
' "configuration": {\n' +
' "load": {\n' +
' "sourceFormat": "NEWLINE_DELIMITED_JSON",\n' +
' "schema": {\n'
' "fields": ' + schema.read() + '\n' +
' },\n' +
' "destinationTable": {\n' +
' "projectId": "' + projectId + '",\n' +
' "datasetId": "' + datasetId + '",\n' +
' "tableId": "' + tableId + '"\n' +
' }\n' +
' }\n' +
' }\n' +
'}\n' +
'--xxx\n' +
'Content-Type: application/octet-stream\n' +
'\n')
# Append data from the specified file to the request body
f = open('test.json', 'r')
newresource += f.read().replace('\n', '\r\n')
# Signify the end of the body
newresource += ('--xxx--\n')
print newresource
headers = {'Content-Type': 'multipart/related; boundary=xxx'}
resp, content = http.request(url, method="POST", body=newresource, headers=headers)
if not resp.status == 200:
print resp
print content
else:
jsonResponse = json.loads(content)
jobReference = jsonResponse['jobReference']['jobId']
import time
while True:
jobCollection = service.jobs()
getJob = jobCollection.get(projectId=projectId, jobId=jobReference).execute()
currentStatus = getJob['status']['state']
if 'DONE' == currentStatus:
print "Done Loading!"
return
else:
print 'Waiting to load...'
print 'Current status: ' + currentStatus
print time.ctime()
time.sleep(10)
def main(argv):
# If the credentials don't exist or are invalid, run the native client
# auth flow. The Storage object will ensure that if successful the good
# credentials will get written back to a file.
storage = Storage('bigquery2.dat') # Choose a file name to store the credentials.
credentials = storage.get()
if credentials is None or credentials.invalid:
credentials = run(FLOW, storage)
# Create an httplib2.Http object to handle our HTTP requests and authorize it
# with our good credentials.
http = httplib2.Http()
http = credentials.authorize(http)
service = build('bigquery','v2', http=http)
#datasets = service.datasets().list(projectId='917370487687').execute()
loadTable(http, service)
if __name__ == '__main__':
main(sys.argv)
你需要自己的BigQuery client_id
和client_secret
除了在機器上運行一次之外,您還可以打開瀏覽器並登錄到Google進行復制。然後bigquery2.dat將存儲oauth2刷新標記等。簡單的測試數據,我跟打就是:
test.json
{"asdf": "dd"}
{"asdf": "ax"}
test_schema.json
[
{
"type": "STRING",
"name": "asdf",
"mode": "NULLABLE"
}
]
基本上只記得給負載特性'NEWLINE_DELIMITED_JSON'內設置'sourceFormat'。 – noonien