2013-10-28 41 views
1

更新:我會馬上回答這個問題我自己(此代碼的工作):JSON上傳至BigQuery

基於關我的自定義上傳代碼:https://developers.google.com/bigquery/loading-data-into-bigquery#loaddatapostrequest

import sys 
import json 

from apiclient.discovery import build 
from oauth2client.file import Storage 
from oauth2client.client import AccessTokenRefreshError 
from oauth2client.client import OAuth2WebServerFlow 
from oauth2client.tools import run 
from apiclient.errors import HttpError 

import httplib2 

FLOW = OAuth2WebServerFlow(
    client_id='xxxxxxx.apps.googleusercontent.com', 
    client_secret='shhhhhhhhhhhh', 
    scope='https://www.googleapis.com/auth/bigquery', 
    user_agent='my-program-name/1.0') 

def loadTable(http, service): 
    projectId = 'drc-compute' 
    datasetId = 'standing' 
    import time 
    tableId = 'test_' + str(int(time.time())) 

    url = "https://www.googleapis.com/upload/bigquery/v2/projects/" + projectId + "/jobs" 
    schema = open('test_schema.json', 'r') 

    # Create the body of the request, separated by a boundary of xxx 
    newresource = ('--xxx\n' + 
      'Content-Type: application/json; charset=UTF-8\n' + '\n' + 
      '{\n' + 
      ' "configuration": {\n' + 
      '  "load": {\n' + 
      '  "sourceFormat": "NEWLINE_DELIMITED_JSON",\n' + 
      '  "schema": {\n' 
      '   "fields": ' + schema.read() + '\n' + 
      '  },\n' + 
      '  "destinationTable": {\n' + 
      '  "projectId": "' + projectId + '",\n' + 
      '  "datasetId": "' + datasetId + '",\n' + 
      '  "tableId": "' + tableId + '"\n' + 
      '  }\n' + 
      ' }\n' + 
      ' }\n' + 
      '}\n' + 
      '--xxx\n' + 
      'Content-Type: application/octet-stream\n' + 
      '\n') 

    # Append data from the specified file to the request body 
    f = open('test.json', 'r') 
    newresource += f.read().replace('\n', '\r\n') 

    # Signify the end of the body 
    newresource += ('--xxx--\n') 

    print newresource 

    headers = {'Content-Type': 'multipart/related; boundary=xxx'} 
    resp, content = http.request(url, method="POST", body=newresource, headers=headers) 

    if not resp.status == 200: 
    print resp 
    print content 
    else: 
    jsonResponse = json.loads(content) 
    jobReference = jsonResponse['jobReference']['jobId'] 
    import time 
    while True: 
    jobCollection = service.jobs() 
    getJob = jobCollection.get(projectId=projectId, jobId=jobReference).execute() 
    currentStatus = getJob['status']['state'] 

    if 'DONE' == currentStatus: 
     print "Done Loading!" 
     return 

    else: 
     print 'Waiting to load...' 
     print 'Current status: ' + currentStatus 
     print time.ctime() 
     time.sleep(10) 

def main(argv): 
    # If the credentials don't exist or are invalid, run the native client 
    # auth flow. The Storage object will ensure that if successful the good 
    # credentials will get written back to a file. 
    storage = Storage('bigquery2.dat') # Choose a file name to store the credentials. 
    credentials = storage.get() 
    if credentials is None or credentials.invalid: 
    credentials = run(FLOW, storage) 

    # Create an httplib2.Http object to handle our HTTP requests and authorize it 
    # with our good credentials. 
    http = httplib2.Http() 
    http = credentials.authorize(http) 

    service = build('bigquery','v2', http=http) 

    #datasets = service.datasets().list(projectId='917370487687').execute() 

    loadTable(http, service) 

if __name__ == '__main__': 
    main(sys.argv) 

你需要自己的BigQuery client_idclient_secret除了在機器上運行一次之外,您還可以打開瀏覽器並登錄到Google進行復制。然後bigquery2.dat將存儲oauth2刷新標記等。簡單的測試數據,我跟打就是:

test.json

{"asdf": "dd"} 
{"asdf": "ax"} 

test_schema.json

[ 
    { 
    "type": "STRING", 
    "name": "asdf", 
    "mode": "NULLABLE" 
    } 
] 
+0

基本上只記得給負載特性'NEWLINE_DELIMITED_JSON'內設置'sourceFormat'。 – noonien

回答

2

不離這個問題開,因爲你已經在問題部分回答 - 感謝@noonien評論:

「記得給負載特性中設置sourceFormat到 NEWLINE_DELIMITED_JSON」