2012-06-10 93 views

回答

17

您無法直接在DataStore實體上運行BigQuery,但您可以編寫一個Mapper管道,從DataStore中讀取實體,將它們寫入CSV雲存儲中,然後將這些實例吸收到BigQuery中 - 甚至可以自動化該過程。下面是一個使用Mapper API類只是數據存儲到CSV步驟示例:

import re 
import time 
from datetime import datetime 
import urllib 
import httplib2 
import pickle 

from google.appengine.ext import blobstore 
from google.appengine.ext import db 
from google.appengine.ext import webapp 

from google.appengine.ext.webapp.util import run_wsgi_app 
from google.appengine.ext.webapp import blobstore_handlers 
from google.appengine.ext.webapp import util 
from google.appengine.ext.webapp import template 

from mapreduce.lib import files 
from google.appengine.api import taskqueue 
from google.appengine.api import users 

from mapreduce import base_handler 
from mapreduce import mapreduce_pipeline 
from mapreduce import operation as op 

from apiclient.discovery import build 
from google.appengine.api import memcache 
from oauth2client.appengine import AppAssertionCredentials 


#Number of shards to use in the Mapper pipeline 
SHARDS = 20 

# Name of the project's Google Cloud Storage Bucket 
GS_BUCKET = 'your bucket' 

# DataStore Model 
class YourEntity(db.Expando): 
    field1 = db.StringProperty() # etc, etc 

ENTITY_KIND = 'main.YourEntity' 


class MapReduceStart(webapp.RequestHandler): 
    """Handler that provides link for user to start MapReduce pipeline. 
    """ 
    def get(self): 
    pipeline = IteratorPipeline(ENTITY_KIND) 
    pipeline.start() 
    path = pipeline.base_path + "/status?root=" + pipeline.pipeline_id 
    logging.info('Redirecting to: %s' % path) 
    self.redirect(path) 


class IteratorPipeline(base_handler.PipelineBase): 
    """ A pipeline that iterates through datastore 
    """ 
    def run(self, entity_type): 
    output = yield mapreduce_pipeline.MapperPipeline(
     "DataStore_to_Google_Storage_Pipeline", 
     "main.datastore_map", 
     "mapreduce.input_readers.DatastoreInputReader", 
     output_writer_spec="mapreduce.output_writers.FileOutputWriter", 
     params={ 
      "input_reader":{ 
       "entity_kind": entity_type, 
       }, 
      "output_writer":{ 
       "filesystem": "gs", 
       "gs_bucket_name": GS_BUCKET, 
       "output_sharding":"none", 
       } 
      }, 
      shards=SHARDS) 


def datastore_map(entity_type): 
    props = GetPropsFor(entity_type) 
    data = db.to_dict(entity_type) 
    result = ','.join(['"%s"' % str(data.get(k)) for k in props]) 
    yield('%s\n' % result) 


def GetPropsFor(entity_or_kind): 
    if (isinstance(entity_or_kind, basestring)): 
    kind = entity_or_kind 
    else: 
    kind = entity_or_kind.kind() 
    cls = globals().get(kind) 
    return cls.properties() 


application = webapp.WSGIApplication(
            [('/start', MapReduceStart)], 
            debug=True) 

def main(): 
    run_wsgi_app(application) 

if __name__ == "__main__": 
    main() 

如果追加此您IteratorPipeline類的末尾:yield CloudStorageToBigQuery(output),你可以管得到的CSV文件句柄到BigQuery擷取管道。 ..這樣的:

class CloudStorageToBigQuery(base_handler.PipelineBase): 
    """A Pipeline that kicks off a BigQuery ingestion job. 
    """ 
    def run(self, output): 

# BigQuery API Settings 
SCOPE = 'https://www.googleapis.com/auth/bigquery' 
PROJECT_ID = 'Some_ProjectXXXX' 
DATASET_ID = 'Some_DATASET' 

# Create a new API service for interacting with BigQuery 
credentials = AppAssertionCredentials(scope=SCOPE) 
http = credentials.authorize(httplib2.Http()) 
bigquery_service = build("bigquery", "v2", http=http) 

jobs = bigquery_service.jobs() 
table_name = 'datastore_dump_%s' % datetime.utcnow().strftime(
    '%m%d%Y_%H%M%S') 
files = [str(f.replace('/gs/', 'gs://')) for f in output] 
result = jobs.insert(projectId=PROJECT_ID, 
        body=build_job_data(table_name,files)).execute() 
logging.info(result) 

def build_job_data(table_name, files): 
    return {"projectId": PROJECT_ID, 
      "configuration":{ 
       "load": { 
        "sourceUris": files, 
        "schema":{ 
         # put your schema here 
         "fields": fields 
         }, 
        "destinationTable":{ 
         "projectId": PROJECT_ID, 
         "datasetId": DATASET_ID, 
         "tableId": table_name, 
         }, 
        } 
       } 
      } 
2

不,BigQuery是需要將數據上傳到其中的不同產品。它無法在數據存儲上運行。您可以使用GQL查詢數據存儲。

3

對於BigQuery,您必須將這些類別導出爲CSV或分隔記錄結構,然後加載到BigQuery中,然後您可以進行查詢。我不知道哪個設施可以查詢實時GAE數據存儲。

Biquery是分析查詢引擎,意味着您無法更改記錄。不允許更新或刪除,只能追加。

5

我們正在做一個受信任的測試程序從數據存儲移動到BigQuery的兩個簡單的操作:

  1. 備份ŧ他的數據存儲採用直接數據存儲區管理的備份功能
  2. 導入備份至BigQuery

它會自動處理模式的爲您服務。

更多信息(適用):https://docs.google.com/a/google.com/spreadsheet/viewform?formkey=dHdpeXlmRlZCNWlYSE9BcE5jc2NYOUE6MQ

+0

那麼發生了什麼?任何關於TTP命運的更新? – gae123

+0

是啊,已經有一段時間了 – ZiglioUK

+0

也有興趣 – Omri

6

有了新的(從2013年9月)streaming inserts api你可以從你的應用程序到BigQuery的進口記錄。

數據在BigQuery中立即可用,所以這應該滿足您的實時需求。

雖然這個問題現在是有點老了,這可能是任何人碰到這個問題

絆腳石此刻雖然得到這個從工作的本地開發服務器不是特別好一個簡單的解決方案。

1

截至2016年,現在這是非常可能的!你必須做到以下幾點:

  1. 使用使用數據庫管理
  2. 備份實體在console.developers.google.com我有一個完整的教程
  3. 頭至BigQuery的Web UI做在谷歌存儲的新桶,並導入步驟1中生成的文件。

有關此工作流程的完整示例,請參閱this post

相關問題